diff --git a/integration_tests/backend/test_reads.py b/integration_tests/backend/test_reads.py
index bf8724f98..71b1d5548 100644
--- a/integration_tests/backend/test_reads.py
+++ b/integration_tests/backend/test_reads.py
@@ -6,11 +6,14 @@
 import pytest
 import requests
 import utils
+from aqueduct.constants.enums import RuntimeType
 from aqueduct.models.response_models import (
     GetArtifactResultResponse,
+    GetDagResponse,
     GetDagResultResponse,
     GetNodeArtifactResponse,
     GetNodeOperatorResponse,
+    GetNodeResultContentResponse,
     GetOperatorResultResponse,
     GetOperatorWithArtifactNodeResponse,
 )
@@ -30,6 +33,7 @@ class TestBackend:
     # V2
     GET_WORKFLOWS_TEMPLATE = "/api/v2/workflows"
 
+    GET_DAGS_TEMPLATE = "/api/v2/workflow/%s/dags"
     GET_DAG_RESULTS_TEMPLATE = "/api/v2/workflow/%s/results"
     GET_NODES_RESULTS_TEMPLATE = "/api/v2/workflow/%s/result/%s/nodes/results"
 
@@ -358,6 +362,18 @@ def test_endpoint_workflows_get(self):
                     assert key in v2_workflow
                 assert v2_workflow["user_id"] == user_id
 
+    def test_endpoint_workflow_dags_get(self):
+        flow_id, _ = self.flows["flow_with_metrics_and_checks"]
+        resp = self.get_response(self.GET_DAGS_TEMPLATE % flow_id)
+        resp = resp.json()
+
+        assert len(resp) == 2
+        for dag_dict in resp:
+            dag = GetDagResponse(**dag_dict)
+            assert dag.workflow_id == flow_id
+            assert dag.created_at != ""
+            assert dag.engine_config.type == RuntimeType.AQUEDUCT
+
     def test_endpoint_dag_results_get(self):
         flow_id, n_runs = self.flows["flow_with_metrics_and_checks"]
         resp = self.get_response(self.GET_DAG_RESULTS_TEMPLATE % flow_id).json()
@@ -496,29 +512,38 @@ def test_endpoint_node_artifact_get(self):
             assert sum(all_output_counts) == len(all_output_counts) - 1
             assert set(all_output_counts) == set([0, 1])
 
-    # TODO: ENG-2943 Investigate output
-    # >> {"error":"Unexpected error reading DAG.\nQuery returned no rows."}
-    # def test_endpoint_node_artifact_result_content_get(self):
-    #     flow_id, n_runs = self.flows["flow_with_multiple_operators"]
-    #     flow = self.client.flow(flow_id)
-    #     workflow_resp = flow._get_workflow_resp()
-    #     dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id
-    #     dag_result_id = workflow_resp.workflow_dag_results[0].id
-
-    #     dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result(
-    #         flow_id,
-    #         dag_result_id,
-    #     )
-    #     artifact_ids = list(dag_result_resp.artifacts.keys())
-    #     artifact_id = str(artifact_ids[0])
-
-    #     resp = self.get_response(self.GET_NODE_ARTIFACT_RESULTS_TEMPLATE % (flow_id, dag_id, artifact_id)).json()
-    #     downstream_ids = [GetArtifactResultResponse(**result).id for result in resp]
-    #     for downstream_id in downstream_ids:
-    #         artifact_result_id = str(downstream_id)
-    #         resp = self.get_response(self.GET_NODE_ARTIFACT_RESULT_CONTENT_TEMPLATE % (flow_id, dag_id, artifact_id, artifact_result_id)).json()
-    #         # One of these should be successful (direct descendent of operator)
-    #         print(resp)
+    def test_endpoint_node_artifact_result_content_get(self):
+        for flow_id, _ in [
+            self.flows["flow_with_metrics_and_checks"],
+            self.flows["flow_with_multiple_operators"],
+        ]:
+            flow = self.client.flow(flow_id)
+            workflow_resp = flow._get_workflow_resp()
+            dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id
+            dag_result_id = workflow_resp.workflow_dag_results[0].id
+
+            dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result(
+                flow_id,
+                dag_result_id,
+            )
+            artifact_ids = list(dag_result_resp.artifacts.keys())
+            artifact_id = str(artifact_ids[0])
+
+            resp = self.get_response(
+                self.GET_NODE_ARTIFACT_RESULTS_TEMPLATE % (flow_id, dag_id, artifact_id)
+            ).json()
+            downstream_ids = [GetArtifactResultResponse(**result).id for result in resp]
+            for downstream_id in downstream_ids:
+                artifact_result_id = str(downstream_id)
+                resp = self.get_response(
+                    self.GET_NODE_ARTIFACT_RESULT_CONTENT_TEMPLATE
+                    % (flow_id, dag_id, artifact_id, artifact_result_id)
+                )
+                assert resp.ok
+                resp_obj = GetNodeResultContentResponse(**resp.json())
+                # One of these should be successful (direct descendent of operator)
+                assert not resp_obj.is_downsampled
+                assert len(resp_obj.content) > 0
 
     def test_endpoint_node_artifact_results_get(self):
         for flow_id, _ in [
@@ -567,24 +592,25 @@ def test_endpoint_node_operator_get(self):
             assert str(result.id) == operator_id
             assert result.dag_id == dag_id
 
-    # TODO: ENG-2943 Investigate output
-    # >> {"error":"Unexpected error reading DAG.\nQuery returned no rows."}
-    # def test_endpoint_node_operator_content_get(self):
-    #     flow_id, n_runs = self.flows["flow_with_multiple_operators"]
-    #     flow = self.client.flow(flow_id)
-    #     workflow_resp = flow._get_workflow_resp()
-    #     dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id
-    #     dag_result_id = workflow_resp.workflow_dag_results[0].id
-
-    #     dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result(
-    #         flow_id,
-    #         dag_result_id,
-    #     )
-    #     operator_ids = list(dag_result_resp.operators.keys())
-    #     operator_id = str(operator_ids[0])
-
-    #     resp = self.get_response(self.GET_NODE_OPERATOR_CONTENT_TEMPLATE % (flow_id, dag_id, operator_id))
-    #     print(resp.text)
+    def test_endpoint_node_operator_content_get(self):
+        flow_id, _ = self.flows["flow_with_multiple_operators"]
+        flow = self.client.flow(flow_id)
+        workflow_resp = flow._get_workflow_resp()
+        dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id
+        dag_result_id = workflow_resp.workflow_dag_results[0].id
+
+        dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result(
+            flow_id,
+            dag_result_id,
+        )
+        operator_ids = list(dag_result_resp.operators.keys())
+        operator_id = str(operator_ids[0])
+
+        resp = self.get_response(
+            self.GET_NODE_OPERATOR_CONTENT_TEMPLATE % (flow_id, dag_id, operator_id)
+        )
+        # The response is a form data. For now, we simply check the response's code.
+        assert resp.ok
 
     def test_endpoint_node_metric_get(self):
         flow_id, _ = self.flows["flow_with_metrics_and_checks"]
@@ -613,37 +639,47 @@ def test_endpoint_node_metric_get(self):
         assert len(result.inputs) == 1
         assert len(result.outputs) == 1
 
-    # TODO: ENG-2943 Investigate output
-    # >> {"error":"Unexpected error occurred when retrieving workflow dag.\nQuery returned no rows."}
-    # def test_endpoint_node_metric_result_content_get(self):
-    #     flow_id, _ = self.flows["flow_with_metrics_and_checks"]
-    #     flow = self.client.flow(flow_id)
-    #     workflow_resp = flow._get_workflow_resp()
-    #     dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id
-    #     dag_result_id = workflow_resp.workflow_dag_results[0].id
-
-    #     dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result(
-    #         flow_id,
-    #         dag_result_id,
-    #     )
-    #     operator_ids = [id for id in dag_result_resp.operators.keys() if dag_result_resp.operators[id].spec.metric]
-    #     operator_id = str(operator_ids[0])
-
-    #     resp = self.get_response(
-    #         self.GET_NODE_METRIC_TEMPLATE % (flow_id, dag_id, operator_id)
-    #     ).json()
-
-    #     result = GetOperatorWithArtifactNodeResponse(**resp)
-
-    #     artifact_id = result.artifact_id
-
-    #     resp = self.get_response(self.LIST_ARTIFACT_RESULTS_TEMPLATE % (flow_id, artifact_id)).json()
-    #     results = resp["results"]
-    #     # One of these should be correct for the DAG run and can get result content.
-    #     for artifact_result in results:
-    #         resp = self.get_response(
-    #             self.GET_NODE_METRIC_RESULT_CONTENT_TEMPLATE % (flow_id, dag_id, operator_id, artifact_result["id"])
-    #         ).json()
+    def test_endpoint_node_metric_result_content_get(self):
+        flow_id, _ = self.flows["flow_with_metrics_and_checks"]
+        flow = self.client.flow(flow_id)
+        workflow_resp = flow._get_workflow_resp()
+        dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id
+        dag_result_id = workflow_resp.workflow_dag_results[0].id
+
+        dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result(
+            flow_id,
+            dag_result_id,
+        )
+        operator_ids = [
+            id
+            for id in dag_result_resp.operators.keys()
+            if dag_result_resp.operators[id].spec.metric
+        ]
+        operator_id = str(operator_ids[0])
+
+        resp = self.get_response(
+            self.GET_NODE_METRIC_TEMPLATE % (flow_id, dag_id, operator_id)
+        ).json()
+
+        result = GetOperatorWithArtifactNodeResponse(**resp)
+
+        artifact_id = result.artifact_id
+
+        resp = self.get_response(
+            self.LIST_ARTIFACT_RESULTS_TEMPLATE % (flow_id, artifact_id)
+        ).json()
+        results = resp["results"]
+        # One of these should be correct for the DAG run and can get result content.
+        for artifact_result in results:
+            resp = self.get_response(
+                self.GET_NODE_METRIC_RESULT_CONTENT_TEMPLATE
+                % (flow_id, dag_id, operator_id, artifact_result["id"])
+            )
+            assert resp.ok
+            resp_obj = GetNodeResultContentResponse(**resp.json())
+            # One of these should be successful (direct descendent of operator)
+            assert not resp_obj.is_downsampled
+            assert len(resp_obj.content) > 0
 
     def test_endpoint_node_check_get(self):
         flow_id, _ = self.flows["flow_with_metrics_and_checks"]
@@ -672,34 +708,44 @@ def test_endpoint_node_check_get(self):
         assert len(result.inputs) == 1
         assert len(result.outputs) == 0
 
-    # TODO: ENG-2943 Investigate output
-    # >> {"error":"Unexpected error occurred when retrieving workflow dag.\nQuery returned no rows."}
-    # def test_endpoint_node_check_result_content_get(self):
-    #     flow_id, _ = self.flows["flow_with_metrics_and_checks"]
-    #     flow = self.client.flow(flow_id)
-    #     workflow_resp = flow._get_workflow_resp()
-    #     dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id
-    #     dag_result_id = workflow_resp.workflow_dag_results[0].id
-
-    #     dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result(
-    #         flow_id,
-    #         dag_result_id,
-    #     )
-    #     operator_ids = [id for id in dag_result_resp.operators.keys() if dag_result_resp.operators[id].spec.check]
-    #     operator_id = str(operator_ids[0])
-
-    #     resp = self.get_response(
-    #         self.GET_NODE_CHECK_TEMPLATE % (flow_id, dag_id, operator_id)
-    #     ).json()
-
-    #     result = GetOperatorWithArtifactNodeResponse(**resp)
-
-    #     artifact_id = result.artifact_id
-
-    #     resp = self.get_response(self.LIST_ARTIFACT_RESULTS_TEMPLATE % (flow_id, artifact_id)).json()
-    #     results = resp["results"]
-    #     # One of these should be correct for the DAG run and can get result content.
-    #     for artifact_result in results:
-    #         resp = self.get_response(
-    #             self.GET_NODE_CHECK_RESULT_CONTENT_TEMPLATE % (flow_id, dag_id, operator_id, artifact_result["id"])
-    #         ).json()
+    def test_endpoint_node_check_result_content_get(self):
+        flow_id, _ = self.flows["flow_with_metrics_and_checks"]
+        flow = self.client.flow(flow_id)
+        workflow_resp = flow._get_workflow_resp()
+        dag_id = workflow_resp.workflow_dag_results[0].workflow_dag_id
+        dag_result_id = workflow_resp.workflow_dag_results[0].id
+
+        dag_result_resp = globals.__GLOBAL_API_CLIENT__.get_workflow_dag_result(
+            flow_id,
+            dag_result_id,
+        )
+        operator_ids = [
+            id
+            for id in dag_result_resp.operators.keys()
+            if dag_result_resp.operators[id].spec.check
+        ]
+        operator_id = str(operator_ids[0])
+
+        resp = self.get_response(
+            self.GET_NODE_CHECK_TEMPLATE % (flow_id, dag_id, operator_id)
+        ).json()
+
+        result = GetOperatorWithArtifactNodeResponse(**resp)
+
+        artifact_id = result.artifact_id
+
+        resp = self.get_response(
+            self.LIST_ARTIFACT_RESULTS_TEMPLATE % (flow_id, artifact_id)
+        ).json()
+        results = resp["results"]
+        # One of these should be correct for the DAG run and can get result content.
+        for artifact_result in results:
+            resp = self.get_response(
+                self.GET_NODE_CHECK_RESULT_CONTENT_TEMPLATE
+                % (flow_id, dag_id, operator_id, artifact_result["id"])
+            )
+            assert resp.ok
+            resp_obj = GetNodeResultContentResponse(**resp.json())
+            # One of these should be successful (direct descendent of operator)
+            assert not resp_obj.is_downsampled
+            assert len(resp_obj.content) > 0
diff --git a/sdk/aqueduct/models/response_models.py b/sdk/aqueduct/models/response_models.py
index 5ad84a72a..46cc8e3d8 100644
--- a/sdk/aqueduct/models/response_models.py
+++ b/sdk/aqueduct/models/response_models.py
@@ -9,7 +9,7 @@
     SerializationType,
 )
 from aqueduct.models.artifact import ArtifactMetadata
-from aqueduct.models.dag import Metadata, RetentionPolicy, Schedule
+from aqueduct.models.dag import EngineConfig, Metadata, RetentionPolicy, Schedule
 from aqueduct.models.execution_state import ExecutionState
 from aqueduct.models.operators import LoadSpec, Operator, OperatorSpec
 from aqueduct.models.utils import human_readable_timestamp
@@ -23,6 +23,18 @@ class ArtifactResult(BaseModel):
 
 
 # V2 Responses
+class GetDagResponse(BaseModel):
+    id: uuid.UUID
+    workflow_id: uuid.UUID
+    created_at: str
+    engine_config: EngineConfig
+
+
+class GetNodeResultContentResponse(BaseModel):
+    is_downsampled: bool
+    content: str
+
+
 class GetDagResultResponse(BaseModel):
     """Represents the result of a single workflow run.
 
diff --git a/src/golang/cmd/server/handler/v2/dags_get.go b/src/golang/cmd/server/handler/v2/dags_get.go
new file mode 100644
index 000000000..1121eb0d1
--- /dev/null
+++ b/src/golang/cmd/server/handler/v2/dags_get.go
@@ -0,0 +1,94 @@
+package v2
+
+import (
+	"context"
+	"net/http"
+
+	"github.com/aqueducthq/aqueduct/cmd/server/handler"
+	"github.com/aqueducthq/aqueduct/cmd/server/request/parser"
+	aq_context "github.com/aqueducthq/aqueduct/lib/context"
+	"github.com/aqueducthq/aqueduct/lib/database"
+	"github.com/aqueducthq/aqueduct/lib/functional/slices"
+	"github.com/aqueducthq/aqueduct/lib/models"
+	"github.com/aqueducthq/aqueduct/lib/repos"
+	"github.com/aqueducthq/aqueduct/lib/response"
+	"github.com/dropbox/godropbox/errors"
+	"github.com/google/uuid"
+)
+
+// This file should map directly to
+// src/ui/common/src/handlers/v2/DagsGet.tsx
+//
+// Route: /v2/workflow/{workflowId}/dags
+// Method: GET
+// Params:
+//	`workflowId`: ID for `workflow` object
+// Request:
+//	Headers:
+//		`api-key`: user's API Key
+//
+// Response:
+//	Body:
+//		serialized `[]response.DAGResult`
+
+type dagsGetArgs struct {
+	*aq_context.AqContext
+	workflowID uuid.UUID
+}
+
+type DAGsGetHandler struct {
+	handler.GetHandler
+
+	Database database.Database
+
+	WorkflowRepo repos.Workflow
+	DAGRepo      repos.DAG
+}
+
+func (*DAGsGetHandler) Name() string {
+	return "DAGsGet"
+}
+
+func (h *DAGsGetHandler) Prepare(r *http.Request) (interface{}, int, error) {
+	aqContext, statusCode, err := aq_context.ParseAqContext(r.Context())
+	if err != nil {
+		return nil, statusCode, err
+	}
+
+	workflowID, err := (parser.WorkflowIDParser{}).Parse(r)
+	if err != nil {
+		return nil, http.StatusBadRequest, err
+	}
+
+	return &dagsGetArgs{
+		AqContext:  aqContext,
+		workflowID: workflowID,
+	}, http.StatusOK, nil
+}
+
+func (h *DAGsGetHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) {
+	args := interfaceArgs.(*dagsGetArgs)
+
+	ok, err := h.WorkflowRepo.ValidateOrg(
+		ctx,
+		args.workflowID,
+		args.OrgID,
+		h.Database,
+	)
+	if err != nil {
+		return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error during workflow ownership validation.")
+	}
+
+	if !ok {
+		return nil, http.StatusBadRequest, errors.Wrap(err, "The organization does not own this workflow.")
+	}
+
+	dbDAGs, err := h.DAGRepo.GetByWorkflow(ctx, args.workflowID, h.Database)
+	if err != nil {
+		return nil, http.StatusInternalServerError, errors.Wrap(err, "Unexpected error reading dag results.")
+	}
+
+	return slices.Map(dbDAGs, func(dbDAG models.DAG) response.DAG {
+		return *response.NewDAGFromDBObject(&dbDAG)
+	}), http.StatusOK, nil
+}
diff --git a/src/golang/cmd/server/handler/v2/node_artifact_result_content_get.go b/src/golang/cmd/server/handler/v2/node_artifact_result_content_get.go
index e8c72bc06..c65e859bf 100644
--- a/src/golang/cmd/server/handler/v2/node_artifact_result_content_get.go
+++ b/src/golang/cmd/server/handler/v2/node_artifact_result_content_get.go
@@ -2,7 +2,6 @@ package v2
 
 import (
 	"context"
-	"mime/multipart"
 	"net/http"
 
 	"github.com/aqueducthq/aqueduct/cmd/server/handler"
@@ -17,11 +16,6 @@ import (
 	"github.com/google/uuid"
 )
 
-const (
-	formIsDownsampledField = "is_downsampled"
-	formContentField       = "content"
-)
-
 // This file should map directly to
 // src/ui/common/src/handlers/v2/NodeArtifactResultContentGet.tsx
 //
@@ -106,9 +100,9 @@ func (h *NodeArtifactResultContentGetHandler) Perform(ctx context.Context, inter
 	args := interfaceArgs.(*nodeResultGetArgs)
 	emptyResp := &nodeResultGetResponse{}
 
-	dag, err := h.DAGRepo.GetByDAGResult(
+	dag, err := h.DAGRepo.Get(
 		ctx,
-		args.nodeResultID,
+		args.dagID,
 		h.Database,
 	)
 	if err != nil {
@@ -160,54 +154,8 @@ func (h *NodeArtifactResultContentGetHandler) Perform(ctx context.Context, inter
 			return emptyResp, http.StatusOK, nil
 		}
 
-		return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Failed to retrieve data for the artifact result.")
-	} else if !errors.Is(err, storage.ErrObjectDoesNotExist()) {
 		return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Failed to retrieve data for the artifact result.")
 	}
 
 	return &nodeResultGetResponse{IsDownsampled: isDownsampled, Content: data}, http.StatusOK, nil
 }
-
-// This custom implementation of SendResponse constructs a multipart form response with two fields:
-// 1: "metadata" contains a json serialized blob of artifact result metadata.
-// 2: "data" contains the artifact result data blob generated the serialization method
-// specified in the metadata field.
-func (*NodeArtifactResultContentGetHandler) SendResponse(w http.ResponseWriter, interfaceResp interface{}) {
-	resp := interfaceResp.(*nodeResultGetResponse)
-	multipartWriter := multipart.NewWriter(w)
-	defer multipartWriter.Close()
-
-	w.Header().Set("Content-Type", multipartWriter.FormDataContentType())
-
-	// The second argument is the file name, which is redundant but required by the UI to parse the file correctly.
-	formFieldWriter, err := multipartWriter.CreateFormFile(formIsDownsampledField, formIsDownsampledField)
-	if err != nil {
-		http.Error(w, err.Error(), http.StatusInternalServerError)
-		return
-	}
-
-	if resp.IsDownsampled {
-		_, err = formFieldWriter.Write([]byte{1})
-	} else {
-		_, err = formFieldWriter.Write([]byte{0})
-	}
-
-	if err != nil {
-		http.Error(w, err.Error(), http.StatusInternalServerError)
-		return
-	}
-
-	if len(resp.Content) > 0 {
-		formFieldWriter, err = multipartWriter.CreateFormFile(formContentField, formContentField)
-		if err != nil {
-			http.Error(w, err.Error(), http.StatusInternalServerError)
-			return
-		}
-
-		_, err = formFieldWriter.Write(resp.Content)
-		if err != nil {
-			http.Error(w, err.Error(), http.StatusInternalServerError)
-			return
-		}
-	}
-}
diff --git a/src/golang/cmd/server/handler/v2/node_check_result_content_get.go b/src/golang/cmd/server/handler/v2/node_check_result_content_get.go
index 2879f60bc..764aea447 100644
--- a/src/golang/cmd/server/handler/v2/node_check_result_content_get.go
+++ b/src/golang/cmd/server/handler/v2/node_check_result_content_get.go
@@ -88,9 +88,9 @@ func (h *NodeCheckResultContentGetHandler) Perform(ctx context.Context, interfac
 	args := interfaceArgs.(*nodeResultGetArgs)
 	emptyResp := &nodeResultGetResponse{}
 
-	dag, err := h.DAGRepo.GetByDAGResult(
+	dag, err := h.DAGRepo.Get(
 		ctx,
-		args.nodeResultID,
+		args.dagID,
 		h.Database,
 	)
 	if err != nil {
@@ -148,8 +148,6 @@ func (h *NodeCheckResultContentGetHandler) Perform(ctx context.Context, interfac
 			return emptyResp, http.StatusOK, nil
 		}
 
-		return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Failed to retrieve data for the artifact result.")
-	} else if !errors.Is(err, storage.ErrObjectDoesNotExist()) {
 		return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Failed to retrieve data for the artifact result.")
 	}
 
diff --git a/src/golang/cmd/server/handler/v2/node_metric_result_content_get.go b/src/golang/cmd/server/handler/v2/node_metric_result_content_get.go
index 7f7c9601d..f877d7111 100644
--- a/src/golang/cmd/server/handler/v2/node_metric_result_content_get.go
+++ b/src/golang/cmd/server/handler/v2/node_metric_result_content_get.go
@@ -88,9 +88,9 @@ func (h *NodeMetricResultContentGetHandler) Perform(ctx context.Context, interfa
 	args := interfaceArgs.(*nodeResultGetArgs)
 	emptyResp := &nodeResultGetResponse{}
 
-	dag, err := h.DAGRepo.GetByDAGResult(
+	dag, err := h.DAGRepo.Get(
 		ctx,
-		args.nodeResultID,
+		args.dagID,
 		h.Database,
 	)
 	if err != nil {
@@ -148,8 +148,6 @@ func (h *NodeMetricResultContentGetHandler) Perform(ctx context.Context, interfa
 			return emptyResp, http.StatusOK, nil
 		}
 
-		return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Failed to retrieve data for the artifact result.")
-	} else if !errors.Is(err, storage.ErrObjectDoesNotExist()) {
 		return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Failed to retrieve data for the artifact result.")
 	}
 
diff --git a/src/golang/cmd/server/handler/delete_workflow.go b/src/golang/cmd/server/handler/v2/workflow_delete.go
similarity index 93%
rename from src/golang/cmd/server/handler/delete_workflow.go
rename to src/golang/cmd/server/handler/v2/workflow_delete.go
index da71a13ae..28d805cf8 100644
--- a/src/golang/cmd/server/handler/delete_workflow.go
+++ b/src/golang/cmd/server/handler/v2/workflow_delete.go
@@ -1,4 +1,4 @@
-package handler
+package v2
 
 import (
 	"context"
@@ -7,6 +7,7 @@ import (
 	"net/http"
 	"time"
 
+	"github.com/aqueducthq/aqueduct/cmd/server/handler"
 	"github.com/aqueducthq/aqueduct/cmd/server/routes"
 	"github.com/aqueducthq/aqueduct/config"
 	aq_context "github.com/aqueducthq/aqueduct/lib/context"
@@ -37,7 +38,11 @@ type SavedObjectResult struct {
 	Result shared.ExecutionState `json:"exec_state"`
 }
 
-// Route: /workflow/{workflowId}/delete
+// Route:
+//
+//	v2/workflow/{workflowId}/delete
+//	workflow/{workflowId}/delete
+//
 // Method: POST
 // Params: workflowId
 // Request:
@@ -45,20 +50,20 @@ type SavedObjectResult struct {
 //	Headers:
 //		`api-key`: user's API Key
 //	Body:
-//		json-serialized `deleteWorkflowInput` object.
+//		json-serialized `workflowDeleteInput` object.
 //
-// Response: json-serialized `deleteWorkflowResponse` object.
+// Response: json-serialized `workflowDeleteResponse` object.
 //
-// The `DeleteWorkflowHandler` does a best effort at deleting a workflow and its dependencies, such as
+// The `WorkflowDeleteHandler` does a best effort at deleting a workflow and its dependencies, such as
 // k8s resources, Postgres state, and output objects in the user's data warehouse.
-type deleteWorkflowArgs struct {
+type workflowDeleteArgs struct {
 	*aq_context.AqContext
 	WorkflowID     uuid.UUID
 	ExternalDelete map[string][]string
 	Force          bool
 }
 
-type deleteWorkflowInput struct {
+type workflowDeleteInput struct {
 	// This is a map from integration_id to the serialized load spec we want to delete.
 	ExternalDeleteLoadParams map[string][]string `json:"external_delete"`
 	// `Force` serve as a safe-guard for client to confirm the deletion.
@@ -67,14 +72,14 @@ type deleteWorkflowInput struct {
 	Force bool `json:"force"`
 }
 
-type deleteWorkflowResponse struct {
+type workflowDeleteResponse struct {
 	// This is a map from integration_id to a list of `SavedObjectResult`
 	// implying if each object is successfully deleted.
 	SavedObjectDeletionResults map[string][]SavedObjectResult `json:"saved_object_deletion_results"`
 }
 
-type DeleteWorkflowHandler struct {
-	PostHandler
+type WorkflowDeleteHandler struct {
+	handler.PostHandler
 
 	Database   database.Database
 	Engine     engine.Engine
@@ -88,11 +93,11 @@ type DeleteWorkflowHandler struct {
 	ArtifactResultRepo       repos.ArtifactResult
 }
 
-func (*DeleteWorkflowHandler) Name() string {
-	return "DeleteWorkflow"
+func (*WorkflowDeleteHandler) Name() string {
+	return "WorkflowDelete"
 }
 
-func (h *DeleteWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error) {
+func (h *WorkflowDeleteHandler) Prepare(r *http.Request) (interface{}, int, error) {
 	aqContext, statuscode, err := aq_context.ParseAqContext(r.Context())
 	if err != nil {
 		return nil, statuscode, err
@@ -117,7 +122,7 @@ func (h *DeleteWorkflowHandler) Prepare(r *http.Request) (interface{}, int, erro
 		return nil, http.StatusBadRequest, errors.New("The organization does not own this workflow.")
 	}
 
-	var input deleteWorkflowInput
+	var input workflowDeleteInput
 	err = json.NewDecoder(r.Body).Decode(&input)
 	if err != nil {
 		return nil, http.StatusBadRequest, errors.Wrap(err, "Unable to parse JSON input.")
@@ -143,7 +148,7 @@ func (h *DeleteWorkflowHandler) Prepare(r *http.Request) (interface{}, int, erro
 		}
 	}
 
-	return &deleteWorkflowArgs{
+	return &workflowDeleteArgs{
 		AqContext:      aqContext,
 		WorkflowID:     workflowID,
 		ExternalDelete: externalDelete,
@@ -151,10 +156,10 @@ func (h *DeleteWorkflowHandler) Prepare(r *http.Request) (interface{}, int, erro
 	}, http.StatusOK, nil
 }
 
-func (h *DeleteWorkflowHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) {
-	args := interfaceArgs.(*deleteWorkflowArgs)
+func (h *WorkflowDeleteHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) {
+	args := interfaceArgs.(*workflowDeleteArgs)
 
-	resp := deleteWorkflowResponse{}
+	resp := workflowDeleteResponse{}
 	resp.SavedObjectDeletionResults = map[string][]SavedObjectResult{}
 
 	nameToID := make(map[string]uuid.UUID, len(args.ExternalDelete))
@@ -285,7 +290,7 @@ func (h *DeleteWorkflowHandler) Perform(ctx context.Context, interfaceArgs inter
 
 func DeleteSavedObject(
 	ctx context.Context,
-	args *deleteWorkflowArgs,
+	args *workflowDeleteArgs,
 	integrationNameToID map[string]uuid.UUID,
 	vaultObject vault.Vault,
 	storageConfig *shared.StorageConfig,
diff --git a/src/golang/cmd/server/handler/list_workflow_objects.go b/src/golang/cmd/server/handler/v2/workflow_objects_get.go
similarity index 92%
rename from src/golang/cmd/server/handler/list_workflow_objects.go
rename to src/golang/cmd/server/handler/v2/workflow_objects_get.go
index 23dfa5e21..af5e017a6 100644
--- a/src/golang/cmd/server/handler/list_workflow_objects.go
+++ b/src/golang/cmd/server/handler/v2/workflow_objects_get.go
@@ -1,9 +1,10 @@
-package handler
+package v2
 
 import (
 	"context"
 	"net/http"
 
+	"github.com/aqueducthq/aqueduct/cmd/server/handler"
 	"github.com/aqueducthq/aqueduct/cmd/server/routes"
 	aq_context "github.com/aqueducthq/aqueduct/lib/context"
 	"github.com/aqueducthq/aqueduct/lib/database"
@@ -20,7 +21,8 @@ import (
 	log "github.com/sirupsen/logrus"
 )
 
-// Route: /workflow/{workflowId}/objects
+// Route:
+//	v2/workflow/{workflowId}/objects
 // Method: GET
 // Params:
 //	`workflowId`: ID for `workflow` object
@@ -31,17 +33,17 @@ import (
 //	Body:
 //		all objects written by `workflowId`
 
-type ListWorkflowObjectsArgs struct {
+type WorkflowObjectsGetArgs struct {
 	*aq_context.AqContext
 	workflowId uuid.UUID
 }
 
-type ListWorkflowObjectsResponse struct {
+type WorkflowObjectsGetResponse struct {
 	LoadDetails []views.LoadOperator `json:"object_details"`
 }
 
-type ListWorkflowObjectsHandler struct {
-	GetHandler
+type WorkflowObjectsGetHandler struct {
+	handler.GetHandler
 
 	Database database.Database
 
@@ -51,11 +53,11 @@ type ListWorkflowObjectsHandler struct {
 	ArtifactResultRepo repos.ArtifactResult
 }
 
-func (*ListWorkflowObjectsHandler) Name() string {
-	return "ListWorkflowObjects"
+func (*WorkflowObjectsGetHandler) Name() string {
+	return "WorkflowObjectsGet"
 }
 
-func (h *ListWorkflowObjectsHandler) Prepare(r *http.Request) (interface{}, int, error) {
+func (h *WorkflowObjectsGetHandler) Prepare(r *http.Request) (interface{}, int, error) {
 	aqContext, statusCode, err := aq_context.ParseAqContext(r.Context())
 	if err != nil {
 		return nil, statusCode, err
@@ -80,15 +82,15 @@ func (h *ListWorkflowObjectsHandler) Prepare(r *http.Request) (interface{}, int,
 		return nil, http.StatusBadRequest, errors.Wrap(err, "The organization does not own this workflow.")
 	}
 
-	return &ListWorkflowObjectsArgs{
+	return &WorkflowObjectsGetArgs{
 		AqContext:  aqContext,
 		workflowId: workflowID,
 	}, http.StatusOK, nil
 }
 
-func (h *ListWorkflowObjectsHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) {
-	args := interfaceArgs.(*ListWorkflowObjectsArgs)
-	emptyResp := ListWorkflowObjectsResponse{}
+func (h *WorkflowObjectsGetHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) {
+	args := interfaceArgs.(*WorkflowObjectsGetArgs)
+	emptyResp := WorkflowObjectsGetResponse{}
 
 	saveOpList, err := GetDistinctLoadOpsByWorkflow(
 		ctx,
@@ -102,7 +104,7 @@ func (h *ListWorkflowObjectsHandler) Perform(ctx context.Context, interfaceArgs
 		return emptyResp, http.StatusInternalServerError, err
 	}
 
-	return ListWorkflowObjectsResponse{
+	return WorkflowObjectsGetResponse{
 		LoadDetails: saveOpList,
 	}, http.StatusOK, nil
 }
diff --git a/src/golang/cmd/server/handler/edit_workflow.go b/src/golang/cmd/server/handler/v2/workflow_patch.go
similarity index 88%
rename from src/golang/cmd/server/handler/edit_workflow.go
rename to src/golang/cmd/server/handler/v2/workflow_patch.go
index 7b3714d43..33fd7ab85 100644
--- a/src/golang/cmd/server/handler/edit_workflow.go
+++ b/src/golang/cmd/server/handler/v2/workflow_patch.go
@@ -1,10 +1,11 @@
-package handler
+package v2
 
 import (
 	"context"
 	"encoding/json"
 	"net/http"
 
+	"github.com/aqueducthq/aqueduct/cmd/server/handler"
 	"github.com/aqueducthq/aqueduct/cmd/server/routes"
 	aq_context "github.com/aqueducthq/aqueduct/lib/context"
 	"github.com/aqueducthq/aqueduct/lib/database"
@@ -18,7 +19,11 @@ import (
 	"github.com/google/uuid"
 )
 
-// Route: /workflow/{workflowId}/edit
+// Route:
+//
+//	/workflow/{workflowId}/edit
+//	v2/workflow/{workflowId}/edit
+//
 // Method: POST
 // Params: workflowId
 // Request:
@@ -26,11 +31,11 @@ import (
 //	Headers:
 //		`api-key`: user's API Key
 //	Body:
-//		serialized `editWorkflowInput` object.
+//		serialized `workflowPatchInput` object.
 //
 // Response: none
-type EditWorkflowHandler struct {
-	PostHandler
+type WorkflowPatchHandler struct {
+	handler.PostHandler
 
 	Database database.Database
 	Engine   engine.Engine
@@ -42,7 +47,7 @@ type EditWorkflowHandler struct {
 	WorkflowRepo repos.Workflow
 }
 
-type editWorkflowInput struct {
+type workflowPatchInput struct {
 	WorkflowName         string                       `json:"name"`
 	WorkflowDescription  string                       `json:"description"`
 	Schedule             *shared.Schedule             `json:"schedule"`
@@ -50,7 +55,7 @@ type editWorkflowInput struct {
 	NotificationSettings *shared.NotificationSettings `json:"notification_settings"`
 }
 
-type editWorkflowArgs struct {
+type workflowPatchArgs struct {
 	workflowId           uuid.UUID
 	workflowName         string
 	workflowDescription  string
@@ -59,11 +64,11 @@ type editWorkflowArgs struct {
 	notificationSettings *shared.NotificationSettings
 }
 
-func (*EditWorkflowHandler) Name() string {
-	return "EditWorkflow"
+func (*WorkflowPatchHandler) Name() string {
+	return "WorkflowPatch"
 }
 
-func (h *EditWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error) {
+func (h *WorkflowPatchHandler) Prepare(r *http.Request) (interface{}, int, error) {
 	aqContext, statusCode, err := aq_context.ParseAqContext(r.Context())
 	if err != nil {
 		return nil, statusCode, err
@@ -92,7 +97,7 @@ func (h *EditWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error)
 		return nil, http.StatusBadRequest, errors.Wrap(err, "The organization does not own this workflow.")
 	}
 
-	var input editWorkflowInput
+	var input workflowPatchInput
 	err = json.NewDecoder(r.Body).Decode(&input)
 	if err != nil {
 		return nil, http.StatusBadRequest, errors.New("Unable to parse JSON input.")
@@ -117,7 +122,7 @@ func (h *EditWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error)
 		return nil, http.StatusBadRequest, errors.New("Edit request issued without any updates specified.")
 	}
 
-	return &editWorkflowArgs{
+	return &workflowPatchArgs{
 		workflowId:           workflowID,
 		workflowName:         input.WorkflowName,
 		workflowDescription:  input.WorkflowDescription,
@@ -127,8 +132,8 @@ func (h *EditWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error)
 	}, http.StatusOK, nil
 }
 
-func (h *EditWorkflowHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) {
-	args := interfaceArgs.(*editWorkflowArgs)
+func (h *WorkflowPatchHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) {
+	args := interfaceArgs.(*workflowPatchArgs)
 	txn, err := h.Database.BeginTx(ctx)
 	if err != nil {
 		return nil, http.StatusInternalServerError, errors.Wrap(err, "Unable to update workflow.")
diff --git a/src/golang/cmd/server/handler/refresh_workflow.go b/src/golang/cmd/server/handler/v2/workflow_post.go
similarity index 81%
rename from src/golang/cmd/server/handler/refresh_workflow.go
rename to src/golang/cmd/server/handler/v2/workflow_post.go
index c3a7e7029..2d6c8f9fe 100644
--- a/src/golang/cmd/server/handler/refresh_workflow.go
+++ b/src/golang/cmd/server/handler/v2/workflow_post.go
@@ -1,9 +1,10 @@
-package handler
+package v2
 
 import (
 	"context"
 	"net/http"
 
+	"github.com/aqueducthq/aqueduct/cmd/server/handler"
 	"github.com/aqueducthq/aqueduct/cmd/server/request"
 	"github.com/aqueducthq/aqueduct/cmd/server/routes"
 	aq_context "github.com/aqueducthq/aqueduct/lib/context"
@@ -17,12 +18,16 @@ import (
 	"github.com/google/uuid"
 )
 
-type RefreshWorkflowArgs struct {
+type WorkflowPostArgs struct {
 	WorkflowId uuid.UUID
 	Parameters map[string]param.Param
 }
 
-// Route: /workflow/{workflowId}/refresh
+// Route:
+//
+//	workflow/{workflowId}/refresh
+//	v2/workflow/{workflowId}/refresh
+//
 // Method: POST
 // Params: workflowId
 // Request:
@@ -34,8 +39,8 @@ type RefreshWorkflowArgs struct {
 //
 // Refresh workflow creates a new workflow version by
 // triggering running a workflow run.
-type RefreshWorkflowHandler struct {
-	PostHandler
+type WorkflowPostHandler struct {
+	handler.PostHandler
 
 	Database database.Database
 	Engine   engine.Engine
@@ -43,11 +48,11 @@ type RefreshWorkflowHandler struct {
 	WorkflowRepo repos.Workflow
 }
 
-func (*RefreshWorkflowHandler) Name() string {
-	return "RefreshWorkflow"
+func (*WorkflowPostHandler) Name() string {
+	return "WorkflowPost"
 }
 
-func (h *RefreshWorkflowHandler) Prepare(r *http.Request) (interface{}, int, error) {
+func (h *WorkflowPostHandler) Prepare(r *http.Request) (interface{}, int, error) {
 	aqContext, statusCode, err := aq_context.ParseAqContext(r.Context())
 	if err != nil {
 		return nil, statusCode, err
@@ -81,14 +86,14 @@ func (h *RefreshWorkflowHandler) Prepare(r *http.Request) (interface{}, int, err
 		return nil, http.StatusBadRequest, errors.Wrap(err, "The user-defined parameters could not be extracted in current format.")
 	}
 
-	return &RefreshWorkflowArgs{
+	return &WorkflowPostArgs{
 		WorkflowId: workflowID,
 		Parameters: parameters,
 	}, http.StatusOK, nil
 }
 
-func (h *RefreshWorkflowHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) {
-	args := interfaceArgs.(*RefreshWorkflowArgs)
+func (h *WorkflowPostHandler) Perform(ctx context.Context, interfaceArgs interface{}) (interface{}, int, error) {
+	args := interfaceArgs.(*WorkflowPostArgs)
 
 	emptyResp := struct{}{}
 
diff --git a/src/golang/cmd/server/routes/routes.go b/src/golang/cmd/server/routes/routes.go
index e0e44d6b7..73f097e18 100644
--- a/src/golang/cmd/server/routes/routes.go
+++ b/src/golang/cmd/server/routes/routes.go
@@ -9,6 +9,8 @@ const (
 	ListStorageMigrationRoute      = "/api/v2/storage-migrations"
 	WorkflowsRoute                 = "/api/v2/workflows"
 	WorkflowRoute                  = "/api/v2/workflow/{workflowID}"
+	WorkflowObjectsRoute           = "/api/v2/workflow/{workflowID}/objects"
+	DAGsRoute                      = "/api/v2/workflow/{workflowID}/dags"
 	DAGRoute                       = "/api/v2/workflow/{workflowID}/dag/{dagID}"
 	DAGResultsRoute                = "/api/v2/workflow/{workflowID}/results"
 	DAGResultRoute                 = "/api/v2/workflow/{workflowID}/result/{dagResultID}"
@@ -25,6 +27,16 @@ const (
 	NodeOperatorContentRoute       = "/api/v2/workflow/{workflowID}/dag/{dagID}/node/operator/{nodeID}/content"
 	NodesResultsRoute              = "/api/v2/workflow/{workflowID}/result/{dagResultID}/nodes/results"
 
+	// V2 hacky routes
+	// These routes are supposed to be `v2/workflow/{workflowId}`
+	// with PATCH (edit) / POST (trigger) / DELETE method.
+	// However, it requires significant refactor of handler interfaces as we assumed
+	// routes is unique per handler.
+	// For now, we simply use the same handler for this route and v1 workflow edit route.
+	WorkflowEditPostRoute    = "/api/v2/workflow/{workflowId}/edit"
+	WorkflowTriggerPostRoute = "/api/v2/workflow/{workflowId}/trigger"
+	WorkflowDeletePostRoute  = "/api/v2/workflow/{workflowId}/delete"
+
 	// V1 routes
 	GetArtifactVersionsRoute = "/api/artifact/versions"
 	GetArtifactResultRoute   = "/api/artifact/{workflowDagResultId}/{artifactId}/result"
diff --git a/src/golang/cmd/server/server/cron.go b/src/golang/cmd/server/server/cron.go
index caf9f8a62..361613d14 100644
--- a/src/golang/cmd/server/server/cron.go
+++ b/src/golang/cmd/server/server/cron.go
@@ -4,7 +4,7 @@ import (
 	"context"
 	"time"
 
-	"github.com/aqueducthq/aqueduct/cmd/server/handler"
+	v2 "github.com/aqueducthq/aqueduct/cmd/server/handler/v2"
 	"github.com/aqueducthq/aqueduct/config"
 	"github.com/aqueducthq/aqueduct/lib/database"
 	"github.com/aqueducthq/aqueduct/lib/engine"
@@ -40,14 +40,14 @@ func (s *AqServer) triggerMissedCronJobs(
 	if lastExpectedTriggerTime > referenceTime.Unix() {
 		// This means that the workflow should have been triggered, but it wasn't.
 		// So we manually trigger the workflow here.
-		_, _, err := (&handler.RefreshWorkflowHandler{
+		_, _, err := (&v2.WorkflowPostHandler{
 			Database: s.Database,
 			Engine:   s.AqEngine,
 
 			WorkflowRepo: s.WorkflowRepo,
 		}).Perform(
 			ctx,
-			&handler.RefreshWorkflowArgs{
+			&v2.WorkflowPostArgs{
 				WorkflowId: workflowId,
 			},
 		)
diff --git a/src/golang/cmd/server/server/handlers.go b/src/golang/cmd/server/server/handlers.go
index 748842fcd..57d1278a1 100644
--- a/src/golang/cmd/server/server/handlers.go
+++ b/src/golang/cmd/server/server/handlers.go
@@ -23,6 +23,11 @@ func (s *AqServer) Handlers() map[string]handler.Handler {
 			WorkflowRepo: s.WorkflowRepo,
 			OperatorRepo: s.OperatorRepo,
 		},
+		routes.DAGsRoute: &v2.DAGsGetHandler{
+			Database:     s.Database,
+			WorkflowRepo: s.WorkflowRepo,
+			DAGRepo:      s.DAGRepo,
+		},
 		routes.DAGResultRoute: &v2.DAGResultGetHandler{
 			Database:      s.Database,
 			WorkflowRepo:  s.WorkflowRepo,
@@ -164,7 +169,7 @@ func (s *AqServer) Handlers() map[string]handler.Handler {
 			StorageMigrationRepo:     s.StorageMigrationRepo,
 			WorkflowRepo:             s.WorkflowRepo,
 		},
-		routes.DeleteWorkflowRoute: &handler.DeleteWorkflowHandler{
+		routes.DeleteWorkflowRoute: &v2.WorkflowDeleteHandler{
 			Database:   s.Database,
 			Engine:     s.AqEngine,
 			JobManager: s.JobManager,
@@ -182,7 +187,17 @@ func (s *AqServer) Handlers() map[string]handler.Handler {
 
 			IntegrationRepo: s.IntegrationRepo,
 		},
-		routes.EditWorkflowRoute: &handler.EditWorkflowHandler{
+		routes.EditWorkflowRoute: &v2.WorkflowPatchHandler{
+			Database: s.Database,
+			Engine:   s.AqEngine,
+
+			ArtifactRepo: s.ArtifactRepo,
+			DAGRepo:      s.DAGRepo,
+			DAGEdgeRepo:  s.DAGEdgeRepo,
+			OperatorRepo: s.OperatorRepo,
+			WorkflowRepo: s.WorkflowRepo,
+		},
+		routes.WorkflowEditPostRoute: &v2.WorkflowPatchHandler{
 			Database: s.Database,
 			Engine:   s.AqEngine,
 
@@ -243,7 +258,15 @@ func (s *AqServer) Handlers() map[string]handler.Handler {
 			OperatorResultRepo: s.OperatorResultRepo,
 		},
 		routes.GetUserProfileRoute: &handler.GetUserProfileHandler{},
-		routes.ListWorkflowObjectsRoute: &handler.ListWorkflowObjectsHandler{
+		routes.ListWorkflowObjectsRoute: &v2.WorkflowObjectsGetHandler{
+			Database: s.Database,
+
+			OperatorRepo:       s.OperatorRepo,
+			WorkflowRepo:       s.WorkflowRepo,
+			WorkflowDagRepo:    s.DAGRepo,
+			ArtifactResultRepo: s.ArtifactResultRepo,
+		},
+		routes.WorkflowObjectsRoute: &v2.WorkflowObjectsGetHandler{
 			Database: s.Database,
 
 			OperatorRepo:       s.OperatorRepo,
@@ -375,7 +398,13 @@ func (s *AqServer) Handlers() map[string]handler.Handler {
 
 			IntegrationRepo: s.IntegrationRepo,
 		},
-		routes.RefreshWorkflowRoute: &handler.RefreshWorkflowHandler{
+		routes.RefreshWorkflowRoute: &v2.WorkflowPostHandler{
+			Database: s.Database,
+			Engine:   s.AqEngine,
+
+			WorkflowRepo: s.WorkflowRepo,
+		},
+		routes.WorkflowTriggerPostRoute: &v2.WorkflowPostHandler{
 			Database: s.Database,
 			Engine:   s.AqEngine,
 
diff --git a/src/golang/lib/response/node.go b/src/golang/lib/response/node.go
index f0be463df..26289c6d3 100644
--- a/src/golang/lib/response/node.go
+++ b/src/golang/lib/response/node.go
@@ -120,6 +120,7 @@ func NewArtifactFromDBObject(dbArtifactNode *views.ArtifactNode) *Artifact {
 type ArtifactResult struct {
 	// Contains only the `result`. It mostly mirrors 'artifact_result' schema.
 	ID                uuid.UUID                        `json:"id"`
+	ArtifactID        uuid.UUID                        `json:"artifact_id"`
 	SerializationType shared.ArtifactSerializationType `json:"serialization_type"`
 
 	// If `ContentSerialized` is set, the content is small and we directly send
@@ -140,6 +141,7 @@ func NewArtifactResultFromDBObject(
 ) *ArtifactResult {
 	result := &ArtifactResult{
 		ID:                dbArtifactResult.ID,
+		ArtifactID:        dbArtifactResult.ArtifactID,
 		SerializationType: dbArtifactResult.Metadata.SerializationType,
 		ContentPath:       dbArtifactResult.ContentPath,
 		ContentSerialized: content,
@@ -179,14 +181,19 @@ func NewOperatorFromDBObject(dbOperatorNode *views.OperatorNode) *Operator {
 
 type OperatorResult struct {
 	// Contains only the `result`. It mostly mirrors 'operator_result' schema.
-	ID        uuid.UUID              `json:"id"`
-	ExecState *shared.ExecutionState `json:"exec_state"`
+	ID         uuid.UUID              `json:"id"`
+	OperatorID uuid.UUID              `json:"operator_id"`
+	ExecState  *shared.ExecutionState `json:"exec_state"`
 }
 
 func NewOperatorResultFromDBObject(
 	dbOperatorResult *models.OperatorResult,
 ) *OperatorResult {
-	result := &OperatorResult{ID: dbOperatorResult.ID}
+	result := &OperatorResult{
+		ID:         dbOperatorResult.ID,
+		OperatorID: dbOperatorResult.OperatorID,
+	}
+
 	if !dbOperatorResult.ExecState.IsNull {
 		// make a copy of execState's value
 		execStateVal := dbOperatorResult.ExecState.ExecutionState
diff --git a/src/ui/common/src/components/pages/workflow/id/hook.ts b/src/ui/common/src/components/pages/workflow/id/hook.ts
index 096862d1c..de77130a8 100644
--- a/src/ui/common/src/components/pages/workflow/id/hook.ts
+++ b/src/ui/common/src/components/pages/workflow/id/hook.ts
@@ -3,8 +3,13 @@ import { useDispatch, useSelector } from 'react-redux';
 import { useParams } from 'react-router-dom';
 
 import { BreadcrumbLink } from '../../../../components/layouts/NavBar';
+import {
+  useNodesGetQuery,
+  useNodesResultsGetQuery,
+} from '../../../../handlers/AqueductApi';
 import { handleGetWorkflowDag } from '../../../../handlers/getWorkflowDag';
 import { handleGetWorkflowDagResult } from '../../../../handlers/getWorkflowDagResult';
+import { NodeResultsMap, NodesMap } from '../../../../handlers/responses/node';
 import { WorkflowDagResultWithLoadingStatus } from '../../../../reducers/workflowDagResults';
 import { WorkflowDagWithLoadingStatus } from '../../../../reducers/workflowDags';
 import { AppDispatch, RootState } from '../../../../stores/store';
@@ -107,3 +112,40 @@ export default function useWorkflow(
     workflowDagResultWithLoadingStatus,
   };
 }
+
+export function useWorkflowNodes(
+  apiKey: string,
+  workflowId: string,
+  dagId: string | undefined
+): NodesMap {
+  const { data: nodes } = useNodesGetQuery(
+    { apiKey, workflowId, dagId },
+    { skip: !workflowId || !dagId }
+  );
+  return {
+    operators: Object.fromEntries(
+      (nodes?.operators ?? []).map((op) => [op.id, op])
+    ),
+    artifacts: Object.fromEntries(
+      (nodes?.artifacts ?? []).map((artf) => [artf.id, artf])
+    ),
+  };
+}
+export function useWorkflowNodesResults(
+  apiKey: string,
+  workflowId: string,
+  dagResultId: string | undefined
+): NodeResultsMap {
+  const { data: nodeResults } = useNodesResultsGetQuery(
+    { apiKey, workflowId, dagResultId },
+    { skip: !workflowId || !dagResultId }
+  );
+  return {
+    operators: Object.fromEntries(
+      (nodeResults?.operators ?? []).map((op) => [op.operator_id, op])
+    ),
+    artifacts: Object.fromEntries(
+      (nodeResults?.artifacts ?? []).map((artf) => [artf.artifact_id, artf])
+    ),
+  };
+}
diff --git a/src/ui/common/src/handlers/AqueductApi.ts b/src/ui/common/src/handlers/AqueductApi.ts
index bc3c6972d..e9e902f85 100644
--- a/src/ui/common/src/handlers/AqueductApi.ts
+++ b/src/ui/common/src/handlers/AqueductApi.ts
@@ -18,6 +18,7 @@ import {
   DagResultsGetRequest,
   DagResultsGetResponse,
 } from './v2/DagResultsGet';
+import { dagsGetQuery, DagsGetRequest, DagsGetResponse } from './v2/DagsGet';
 import {
   integrationOperatorsGetQuery,
   IntegrationOperatorsGetRequest,
@@ -93,16 +94,36 @@ import {
   NodesResultsGetRequest,
   NodesResultsGetResponse,
 } from './v2/NodesResultsGet';
+import {
+  workflowDeletePostQuery,
+  WorkflowDeletePostRequest,
+  WorkflowDeletePostResponse,
+} from './v2/WorkflowDeletePost';
+import {
+  workflowEditPostQuery,
+  WorkflowEditPostRequest,
+  WorkflowEditPostResponse,
+} from './v2/WorkflowEditPost';
 import {
   workflowGetQuery,
   WorkflowGetRequest,
   WorkflowGetResponse,
 } from './v2/WorkflowGet';
+import {
+  workflowObjectsGetQuery,
+  WorkflowObjectsGetRequest,
+  WorkflowObjectsGetResponse,
+} from './v2/WorkflowObjectsGet';
 import {
   workflowsGetQuery,
   WorkflowsGetRequest,
   WorkflowsGetResponse,
 } from './v2/WorkflowsGet';
+import {
+  workflowTriggerPostQuery,
+  WorkflowTriggerPostRequest,
+  WorkflowTriggerPostResponse,
+} from './v2/WorkflowTriggerPost';
 
 const { createApi, fetchBaseQuery } = ((rtkQueryRaw as any).default ??
   rtkQueryRaw) as typeof rtkQueryRaw;
@@ -126,6 +147,10 @@ export const aqueductApi = createApi({
       query: (req) => dagOperatorsGetQuery(req),
       transformErrorResponse,
     }),
+    dagsGet: builder.query<DagsGetResponse, DagsGetRequest>({
+      query: (req) => dagsGetQuery(req),
+      transformErrorResponse,
+    }),
     dagResultGet: builder.query<DagResultGetResponse, DagResultGetRequest>({
       query: (req) => dagResultGetQuery(req),
       transformErrorResponse,
@@ -230,6 +255,34 @@ export const aqueductApi = createApi({
       query: (req) => storageMigrationListQuery(req),
       transformErrorResponse,
     }),
+    workflowDeletePost: builder.mutation<
+      WorkflowDeletePostResponse,
+      WorkflowDeletePostRequest
+    >({
+      query: (req) => workflowDeletePostQuery(req),
+      transformErrorResponse: transformErrorResponse,
+    }),
+    workflowEditPost: builder.mutation<
+      WorkflowEditPostResponse,
+      WorkflowEditPostRequest
+    >({
+      query: (req) => workflowEditPostQuery(req),
+      transformErrorResponse: transformErrorResponse,
+    }),
+    workflowTriggerPost: builder.mutation<
+      WorkflowTriggerPostResponse,
+      WorkflowTriggerPostRequest
+    >({
+      query: (req) => workflowTriggerPostQuery(req),
+      transformErrorResponse: transformErrorResponse,
+    }),
+    workflowObjectsGet: builder.query<
+      WorkflowObjectsGetResponse,
+      WorkflowObjectsGetRequest
+    >({
+      query: (req) => workflowObjectsGetQuery(req),
+      transformErrorResponse,
+    }),
     workflowsGet: builder.query<WorkflowsGetResponse, WorkflowsGetRequest>({
       query: (req) => workflowsGetQuery(req),
       transformErrorResponse: transformErrorResponse,
@@ -243,6 +296,7 @@ export const aqueductApi = createApi({
 
 export const {
   useDagGetQuery,
+  useDagsGetQuery,
   useDagOperatorsGetQuery,
   useDagResultGetQuery,
   useDagResultsGetQuery,
@@ -262,5 +316,9 @@ export const {
   useNodesGetQuery,
   useNodesResultsGetQuery,
   useWorkflowGetQuery,
+  useWorkflowObjectsGetQuery,
   useWorkflowsGetQuery,
+  useWorkflowDeletePostMutation,
+  useWorkflowEditPostMutation,
+  useWorkflowTriggerPostMutation,
 } = aqueductApi;
diff --git a/src/ui/common/src/handlers/responses/node.ts b/src/ui/common/src/handlers/responses/node.ts
index a217e39e7..065afd011 100644
--- a/src/ui/common/src/handlers/responses/node.ts
+++ b/src/ui/common/src/handlers/responses/node.ts
@@ -39,6 +39,7 @@ export type ArtifactResponse = {
 
 export type ArtifactResultResponse = {
   id: string;
+  artifact_id: string;
   serialization_type: SerializationType;
   content_path: string;
   content_serialized: string;
@@ -57,6 +58,7 @@ export type OperatorResponse = {
 
 export type OperatorResultResponse = {
   id: string;
+  operator_id: string;
   exec_state?: ExecState;
 };
 
@@ -76,6 +78,16 @@ export type NodeResultsResponse = {
   // checks: OperatorWithArtifactNodeResultResponse[];
 };
 
+export type NodesMap = {
+  operators: { [id: string]: OperatorResponse };
+  artifacts: { [id: string]: ArtifactResponse };
+};
+
+export type NodeResultsMap = {
+  operators: { [id: string]: OperatorResultResponse };
+  artifacts: { [id: string]: ArtifactResultResponse };
+};
+
 export type NodeContentResponse = {
   name: string;
   data: string;
diff --git a/src/ui/common/src/handlers/v2/DagsGet.ts b/src/ui/common/src/handlers/v2/DagsGet.ts
new file mode 100644
index 000000000..6d6901fba
--- /dev/null
+++ b/src/ui/common/src/handlers/v2/DagsGet.ts
@@ -0,0 +1,15 @@
+// This file should map exactly to
+// src/golang/cmd/server/handler/v2/dags_get.go
+
+import { APIKeyParameter } from '../parameters/Header';
+import { WorkflowIdParameter } from '../parameters/Path';
+import { DagResponse } from '../responses/workflow';
+
+export type DagsGetRequest = APIKeyParameter & WorkflowIdParameter;
+
+export type DagsGetResponse = DagResponse[];
+
+export const dagsGetQuery = (req: DagsGetRequest) => ({
+  url: `workflow/${req.workflowId}/dags`,
+  headers: { 'api-key': req.apiKey },
+});
diff --git a/src/ui/common/src/handlers/v2/NodeArtifactGet.ts b/src/ui/common/src/handlers/v2/NodeArtifactGet.ts
index 206c9dbf2..97c65fd47 100644
--- a/src/ui/common/src/handlers/v2/NodeArtifactGet.ts
+++ b/src/ui/common/src/handlers/v2/NodeArtifactGet.ts
@@ -7,7 +7,7 @@ import {
   NodeIdParameter,
   WorkflowIdParameter,
 } from '../parameters/Path';
-import { ArtifactResponse } from '../responses/Node';
+import { ArtifactResponse } from '../responses/node';
 
 export type NodeArtifactGetRequest = APIKeyParameter &
   DagIdParameter &
diff --git a/src/ui/common/src/handlers/v2/NodeArtifactResultsGet.ts b/src/ui/common/src/handlers/v2/NodeArtifactResultsGet.ts
index bde579895..0c24d9631 100644
--- a/src/ui/common/src/handlers/v2/NodeArtifactResultsGet.ts
+++ b/src/ui/common/src/handlers/v2/NodeArtifactResultsGet.ts
@@ -7,7 +7,7 @@ import {
   NodeIdParameter,
   WorkflowIdParameter,
 } from '../parameters/Path';
-import { ArtifactResultResponse } from '../responses/Node';
+import { ArtifactResultResponse } from '../responses/node';
 
 export type NodeArtifactResultsGetRequest = APIKeyParameter &
   DagIdParameter &
diff --git a/src/ui/common/src/handlers/v2/NodeOperatorContentGet.ts b/src/ui/common/src/handlers/v2/NodeOperatorContentGet.ts
index a3e3e7f20..118f35528 100644
--- a/src/ui/common/src/handlers/v2/NodeOperatorContentGet.ts
+++ b/src/ui/common/src/handlers/v2/NodeOperatorContentGet.ts
@@ -7,7 +7,7 @@ import {
   NodeIdParameter,
   WorkflowIdParameter,
 } from '../parameters/Path';
-import { NodeContentResponse } from '../responses/Node';
+import { NodeContentResponse } from '../responses/node';
 
 export type NodeOperatorContentGetRequest = APIKeyParameter &
   DagIdParameter &
diff --git a/src/ui/common/src/handlers/v2/NodeOperatorGet.ts b/src/ui/common/src/handlers/v2/NodeOperatorGet.ts
index 783778a67..852392ff0 100644
--- a/src/ui/common/src/handlers/v2/NodeOperatorGet.ts
+++ b/src/ui/common/src/handlers/v2/NodeOperatorGet.ts
@@ -7,7 +7,7 @@ import {
   NodeIdParameter,
   WorkflowIdParameter,
 } from '../parameters/Path';
-import { OperatorResponse } from '../responses/Node';
+import { OperatorResponse } from '../responses/node';
 
 export type NodeOperatorGetRequest = APIKeyParameter &
   DagIdParameter &
diff --git a/src/ui/common/src/handlers/v2/NodesGet.ts b/src/ui/common/src/handlers/v2/NodesGet.ts
index ddbe9e179..7f5c8ae60 100644
--- a/src/ui/common/src/handlers/v2/NodesGet.ts
+++ b/src/ui/common/src/handlers/v2/NodesGet.ts
@@ -3,7 +3,7 @@
 
 import { APIKeyParameter } from '../parameters/Header';
 import { DagIdParameter, WorkflowIdParameter } from '../parameters/Path';
-import { NodesResponse } from '../responses/Node';
+import { NodesResponse } from '../responses/node';
 
 export type NodesGetRequest = APIKeyParameter &
   DagIdParameter &
diff --git a/src/ui/common/src/handlers/v2/NodesResultsGet.ts b/src/ui/common/src/handlers/v2/NodesResultsGet.ts
index 9e5cd327f..fd551534f 100644
--- a/src/ui/common/src/handlers/v2/NodesResultsGet.ts
+++ b/src/ui/common/src/handlers/v2/NodesResultsGet.ts
@@ -3,7 +3,7 @@
 
 import { APIKeyParameter } from '../parameters/Header';
 import { DagResultIdParameter, WorkflowIdParameter } from '../parameters/Path';
-import { NodeResultsResponse } from '../responses/Node';
+import { NodeResultsResponse } from '../responses/node';
 
 export type NodesResultsGetRequest = APIKeyParameter &
   DagResultIdParameter &
diff --git a/src/ui/common/src/handlers/v2/WorkflowDeletePost.ts b/src/ui/common/src/handlers/v2/WorkflowDeletePost.ts
new file mode 100644
index 000000000..565e30305
--- /dev/null
+++ b/src/ui/common/src/handlers/v2/WorkflowDeletePost.ts
@@ -0,0 +1,26 @@
+// This file should map exactly to
+// src/golang/cmd/server/handler/v2/workflow_delete.go
+
+import { SavedObjectDeletion } from '../../utils/workflows';
+import { APIKeyParameter } from '../parameters/Header';
+import { WorkflowIdParameter } from '../parameters/Path';
+
+export type WorkflowDeletePostRequest = APIKeyParameter &
+  WorkflowIdParameter & {
+    external_delete: { [integration_id: string]: string[] };
+    force: boolean;
+  };
+
+export type WorkflowDeletePostResponse = {
+  [id: string]: SavedObjectDeletion;
+};
+
+export const workflowDeletePostQuery = (req: WorkflowDeletePostRequest) => ({
+  url: `workflow/${req.workflowId}/delete`,
+  method: 'POST',
+  headers: { 'api-key': req.apiKey },
+  body: {
+    external_delete: req.external_delete,
+    force: req.force,
+  },
+});
diff --git a/src/ui/common/src/handlers/v2/WorkflowEditPost.ts b/src/ui/common/src/handlers/v2/WorkflowEditPost.ts
new file mode 100644
index 000000000..6f4ee5522
--- /dev/null
+++ b/src/ui/common/src/handlers/v2/WorkflowEditPost.ts
@@ -0,0 +1,34 @@
+// This file should map exactly to
+// src/golang/cmd/server/handler/v2/workflow_patch.go
+
+import {
+  NotificationSettings,
+  RetentionPolicy,
+  WorkflowSchedule,
+} from '../../utils/workflows';
+import { APIKeyParameter } from '../parameters/Header';
+import { WorkflowIdParameter } from '../parameters/Path';
+
+export type WorkflowEditPostRequest = APIKeyParameter &
+  WorkflowIdParameter & {
+    name: string;
+    description: string;
+    schedule: WorkflowSchedule;
+    retention_policy: RetentionPolicy;
+    notification_settings: NotificationSettings;
+  };
+
+export type WorkflowEditPostResponse = Record<string, never>;
+
+export const workflowEditPostQuery = (req: WorkflowEditPostRequest) => ({
+  url: `workflow/${req.workflowId}/edit`,
+  method: 'POST',
+  headers: { 'api-key': req.apiKey },
+  body: {
+    name: req.name,
+    description: req.description,
+    schedule: req.schedule,
+    retention_policy: req.retention_policy,
+    notification_settings: req.notification_settings,
+  },
+});
diff --git a/src/ui/common/src/handlers/v2/WorkflowGet.ts b/src/ui/common/src/handlers/v2/WorkflowGet.ts
index 45e32f7be..bd81870a5 100644
--- a/src/ui/common/src/handlers/v2/WorkflowGet.ts
+++ b/src/ui/common/src/handlers/v2/WorkflowGet.ts
@@ -2,7 +2,7 @@
 // src/golang/cmd/server/handler/v2/workflow_get.go
 import { APIKeyParameter } from '../parameters/Header';
 import { WorkflowIdParameter } from '../parameters/Path';
-import { WorkflowResponse } from '../responses/Workflow';
+import { WorkflowResponse } from '../responses/workflow';
 
 export type WorkflowGetRequest = APIKeyParameter & WorkflowIdParameter;
 
diff --git a/src/ui/common/src/handlers/v2/WorkflowObjectsGet.ts b/src/ui/common/src/handlers/v2/WorkflowObjectsGet.ts
new file mode 100644
index 000000000..7a240229c
--- /dev/null
+++ b/src/ui/common/src/handlers/v2/WorkflowObjectsGet.ts
@@ -0,0 +1,16 @@
+// This file should map exactly to
+// src/golang/cmd/server/handler/v2/workflow_objects_get.go
+
+import { SavedObject } from 'src/utils/workflows';
+
+import { APIKeyParameter } from '../parameters/Header';
+import { WorkflowIdParameter } from '../parameters/Path';
+
+export type WorkflowObjectsGetRequest = APIKeyParameter & WorkflowIdParameter;
+
+export type WorkflowObjectsGetResponse = { object_details: SavedObject[] };
+
+export const workflowObjectsGetQuery = (req: WorkflowObjectsGetRequest) => ({
+  url: `workflow/${req.workflowId}/objects`,
+  headers: { 'api-key': req.apiKey },
+});
diff --git a/src/ui/common/src/handlers/v2/WorkflowTriggerPost.ts b/src/ui/common/src/handlers/v2/WorkflowTriggerPost.ts
new file mode 100644
index 000000000..47242e3b9
--- /dev/null
+++ b/src/ui/common/src/handlers/v2/WorkflowTriggerPost.ts
@@ -0,0 +1,29 @@
+// This file should map exactly to
+// src/golang/cmd/server/handler/v2/workflow_post.go
+
+import { APIKeyParameter } from '../parameters/Header';
+import { WorkflowIdParameter } from '../parameters/Path';
+
+export type WorkflowTriggerPostRequest = APIKeyParameter &
+  WorkflowIdParameter & {
+    serializedParams: string;
+  };
+
+export type WorkflowTriggerPostResponse = Record<string, never>;
+
+export const workflowTriggerPostQuery = (req: WorkflowTriggerPostRequest) => {
+  const parameters = new FormData();
+  parameters.append('parameters', req.serializedParams);
+  return {
+    url: `workflow/${req.workflowId}/edit`,
+    method: 'POST',
+    // avoid built-in content-type override
+    // ref: https://github.com/reduxjs/redux-toolkit/issues/2287
+    prepareHeaders: (headers) => {
+      headers.set('api-key', req.apiKey);
+      headers.set('Content-Type', 'multipart/form-data');
+      return headers;
+    },
+    body: parameters,
+  };
+};