From de47185e3fd087c7a24f66b0cbfb1d4c1a51bd2c Mon Sep 17 00:00:00 2001 From: hadesy <6346047+hadesy@users.noreply.github.com> Date: Wed, 6 Jul 2022 19:15:21 +0800 Subject: [PATCH 1/2] fix: support RemainingItemCount in archivedWrokflow Signed-off-by: hadesy <6346047+hadesy@users.noreply.github.com> --- persist/sqldb/mocks/WorkflowArchive.go | 20 +++++++++++++ persist/sqldb/null_workflow_archive.go | 4 +++ persist/sqldb/workflow_archive.go | 29 +++++++++++++++++++ .../archived_workflow_server.go | 21 ++++++++++++++ .../archived_workflow_server_test.go | 7 +++++ 5 files changed, 81 insertions(+) diff --git a/persist/sqldb/mocks/WorkflowArchive.go b/persist/sqldb/mocks/WorkflowArchive.go index f71f3687919f..22fe71a78191 100644 --- a/persist/sqldb/mocks/WorkflowArchive.go +++ b/persist/sqldb/mocks/WorkflowArchive.go @@ -118,6 +118,26 @@ func (_m *WorkflowArchive) ListWorkflows(namespace string, name string, namePref return r0, r1 } +func (_m *WorkflowArchive) CountWorkflows(namespace string, name string, namePrefix string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements) (int64, error) { + ret := _m.Called(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, string, string, time.Time, time.Time, labels.Requirements) error); ok { + r1 = rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // ListWorkflowsLabelKeys provides a mock function with given fields: func (_m *WorkflowArchive) ListWorkflowsLabelKeys() (*v1alpha1.LabelKeys, error) { ret := _m.Called() diff --git a/persist/sqldb/null_workflow_archive.go b/persist/sqldb/null_workflow_archive.go index 74218279ee64..3be1c0426afb 100644 --- a/persist/sqldb/null_workflow_archive.go +++ b/persist/sqldb/null_workflow_archive.go @@ -25,6 +25,10 @@ func (r *nullWorkflowArchive) ListWorkflows(string, string, string, time.Time, t return wfv1.Workflows{}, nil } +func (r *nullWorkflowArchive) CountWorkflows(string, string, string, time.Time, time.Time, labels.Requirements) (int64, error) { + return 0, nil +} + func (r *nullWorkflowArchive) GetWorkflow(string) (*wfv1.Workflow, error) { return nil, fmt.Errorf("getting archived workflows not supported") } diff --git a/persist/sqldb/workflow_archive.go b/persist/sqldb/workflow_archive.go index bbbdea4e8fa8..ac7ad1d7436f 100644 --- a/persist/sqldb/workflow_archive.go +++ b/persist/sqldb/workflow_archive.go @@ -46,12 +46,17 @@ type archivedWorkflowLabelRecord struct { Value string `db:"value"` } +type archivedWorkflowCount struct { + Total uint64 `db:"total,omitempty" json:"total"` +} + //go:generate mockery --name=WorkflowArchive type WorkflowArchive interface { ArchiveWorkflow(wf *wfv1.Workflow) error // list workflows, with the most recently started workflows at the beginning (i.e. index 0 is the most recent) ListWorkflows(namespace string, name string, namePrefix string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements, limit, offset int) (wfv1.Workflows, error) + CountWorkflows(namespace string, name string, namePrefix string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements) (int64, error) GetWorkflow(uid string) (*wfv1.Workflow, error) DeleteWorkflow(uid string) error DeleteExpiredWorkflows(ttl time.Duration) error @@ -185,6 +190,30 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi return wfs, nil } +func (r *workflowArchive) CountWorkflows(namespace string, name string, namePrefix string, minStartedAt, maxStartedAt time.Time, labelRequirements labels.Requirements) (int64, error) { + total := &archivedWorkflowCount{} + clause, err := labelsClause(r.dbType, labelRequirements) + if err != nil { + return 0, err + } + + err = r.session. + Select(db.Raw("count(*) as total")). + From(archiveTableName). + Where(r.clusterManagedNamespaceAndInstanceID()). + And(namespaceEqual(namespace)). + And(nameEqual(name)). + And(namePrefixClause(namePrefix)). + And(startedAtClause(minStartedAt, maxStartedAt)). + And(clause). + One(total) + if err != nil { + return 0, err + } + + return int64(total.Total), nil +} + func (r *workflowArchive) clusterManagedNamespaceAndInstanceID() db.Compound { return db.And( db.Cond{"clustername": r.clusterName}, diff --git a/server/workflowarchive/archived_workflow_server.go b/server/workflowarchive/archived_workflow_server.go index 735293814db4..9bec929ef0eb 100644 --- a/server/workflowarchive/archived_workflow_server.go +++ b/server/workflowarchive/archived_workflow_server.go @@ -56,6 +56,7 @@ func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req name := "" minStartedAt := time.Time{} maxStartedAt := time.Time{} + showRemainingItemCount := false for _, selector := range strings.Split(options.FieldSelector, ",") { if len(selector) == 0 { continue @@ -74,6 +75,11 @@ func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req if err != nil { return nil, err } + } else if strings.HasPrefix(selector, "ext.showRemainingItemCount") { + showRemainingItemCount, err = strconv.ParseBool(strings.TrimPrefix(selector, "ext.showRemainingItemCount=")) + if err != nil { + return nil, err + } } else { return nil, fmt.Errorf("unsupported requirement %s", selector) } @@ -109,6 +115,21 @@ func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req meta := metav1.ListMeta{} + if showRemainingItemCount && !loadAll { + total, err := w.wfArchive.CountWorkflows(namespace, name, namePrefix, minStartedAt, maxStartedAt, requirements) + if err != nil { + return nil, err + } + var count = total - int64(offset) - int64(items.Len()) + if len(items) > limit { + count = count + 1 + } + if count < 0 { + count = 0 + } + meta.RemainingItemCount = &count + } + if !loadAll && len(items) > limit { items = items[0:limit] meta.Continue = fmt.Sprintf("%v", offset+limit) diff --git a/server/workflowarchive/archived_workflow_server_test.go b/server/workflowarchive/archived_workflow_server_test.go index 96b7802d5878..387bebd28a4c 100644 --- a/server/workflowarchive/archived_workflow_server_test.go +++ b/server/workflowarchive/archived_workflow_server_test.go @@ -57,6 +57,7 @@ func Test_archivedWorkflowServer(t *testing.T) { repo.On("ListWorkflows", "", "my-name", "", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil) repo.On("ListWorkflows", "", "", "my-", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil) repo.On("ListWorkflows", "", "my-name", "my-", minStartAt, maxStartAt, labels.Requirements(nil), 2, 0).Return(wfv1.Workflows{{}}, nil) + repo.On("CountWorkflows", "", "my-name", "my-", minStartAt, maxStartAt, labels.Requirements(nil)).Return(int64(5), nil) repo.On("GetWorkflow", "").Return(nil, nil) repo.On("GetWorkflow", "my-uid").Return(&wfv1.Workflow{ ObjectMeta: metav1.ObjectMeta{Name: "my-name"}, @@ -154,6 +155,12 @@ func Test_archivedWorkflowServer(t *testing.T) { assert.Len(t, resp.Items, 1) assert.Empty(t, resp.Continue) } + resp, err = w.ListArchivedWorkflows(ctx, &workflowarchivepkg.ListArchivedWorkflowsRequest{ListOptions: &metav1.ListOptions{FieldSelector: "metadata.name=my-name,spec.startedAt>2020-01-01T00:00:00Z,spec.startedAt<2020-01-02T00:00:00Z,ext.showRemainingItemCount=true", Limit: 1}, NamePrefix: "my-"}) + if assert.NoError(t, err) { + assert.Len(t, resp.Items, 1) + assert.Equal(t, *resp.ListMeta.RemainingItemCount, int64(4)) + assert.Empty(t, resp.Continue) + } }) t.Run("GetArchivedWorkflow", func(t *testing.T) { allowed = false From b8b658cd45856357e825055219339e6bdf4df01c Mon Sep 17 00:00:00 2001 From: hadesy <6346047+hadesy@users.noreply.github.com> Date: Mon, 18 Jul 2022 00:04:48 +0800 Subject: [PATCH 2/2] fix: generated code Signed-off-by: hadesy <6346047+hadesy@users.noreply.github.com> --- persist/sqldb/mocks/WorkflowArchive.go | 41 +++++++++++++------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/persist/sqldb/mocks/WorkflowArchive.go b/persist/sqldb/mocks/WorkflowArchive.go index 22fe71a78191..687c8b8810c2 100644 --- a/persist/sqldb/mocks/WorkflowArchive.go +++ b/persist/sqldb/mocks/WorkflowArchive.go @@ -30,6 +30,27 @@ func (_m *WorkflowArchive) ArchiveWorkflow(wf *v1alpha1.Workflow) error { return r0 } +// CountWorkflows provides a mock function with given fields: namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements +func (_m *WorkflowArchive) CountWorkflows(namespace string, name string, namePrefix string, minStartAt time.Time, maxStartAt time.Time, labelRequirements labels.Requirements) (int64, error) { + ret := _m.Called(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) + + var r0 int64 + if rf, ok := ret.Get(0).(func(string, string, string, time.Time, time.Time, labels.Requirements) int64); ok { + r0 = rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) + } else { + r0 = ret.Get(0).(int64) + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, string, string, time.Time, time.Time, labels.Requirements) error); ok { + r1 = rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // DeleteExpiredWorkflows provides a mock function with given fields: ttl func (_m *WorkflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error { ret := _m.Called(ttl) @@ -118,26 +139,6 @@ func (_m *WorkflowArchive) ListWorkflows(namespace string, name string, namePref return r0, r1 } -func (_m *WorkflowArchive) CountWorkflows(namespace string, name string, namePrefix string, minStartAt, maxStartAt time.Time, labelRequirements labels.Requirements) (int64, error) { - ret := _m.Called(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) - - var r0 int64 - if rf, ok := ret.Get(0).(func() int64); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(int64) - } - - var r1 error - if rf, ok := ret.Get(1).(func(string, string, string, time.Time, time.Time, labels.Requirements) error); ok { - r1 = rf(namespace, name, namePrefix, minStartAt, maxStartAt, labelRequirements) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - // ListWorkflowsLabelKeys provides a mock function with given fields: func (_m *WorkflowArchive) ListWorkflowsLabelKeys() (*v1alpha1.LabelKeys, error) { ret := _m.Called()