Skip to content

Commit

Permalink
Merge branch 'main' into gc-multiple
Browse files Browse the repository at this point in the history
  • Loading branch information
wy65701436 authored Jun 29, 2023
2 parents 74ab4fd + 02a1c41 commit 0574a34
Show file tree
Hide file tree
Showing 43 changed files with 1,082 additions and 320 deletions.
37 changes: 27 additions & 10 deletions src/jobservice/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ To let the job service recognize the job, the implementation of job should follo
A valid job must implement the job interface. For the details of each method defined in the job interface, you can refer the comments attached with the method.

```go

// Interface defines the related injection and run entry methods.
type Interface interface {
// Declare how many times the job can be retried if failed.
Expand All @@ -76,7 +77,13 @@ type Interface interface {
// uint: the failure count allowed. If it is set to 0, then default value 4 is used.
MaxFails() uint

// Tell the worker pool if retry the failed job when the fails is
// Max currency of the job. Unlike the WorkerPool concurrency, it controls the limit on the number jobs of that type
// that can be active at one time by within a single redis instance.
//
// The default value is 0, which means "no limit on job concurrency".
MaxCurrency() uint

// Tell the worker worker if retry the failed job when the fails is
// still less that the number declared by the method 'MaxFails'.
//
// Returns:
Expand All @@ -87,18 +94,18 @@ type Interface interface {
//
// Return:
// error if parameters are not valid. NOTES: If no parameters needed, directly return nil.
Validate(params map[string]interface{}) error
Validate(params Parameters) error

// Run the business logic here.
// The related arguments will be injected by the workerpool.
//
// ctx env.JobContext : Job execution context.
// ctx Context : Job execution context.
// params map[string]interface{} : parameters with key-pair style for the job execution.
//
// Returns:
// error if failed to run. NOTES: If job is stopped or cancelled, a specified error should be returned
//
Run(ctx env.JobContext, params map[string]interface{}) error
Run(ctx Context, params Parameters) error
}
```

Expand Down Expand Up @@ -180,14 +187,19 @@ func (dj *DemoJob) MaxFails() uint {
return 3
}

// MaxCurrency is implementation of same method in Interface.
func (dj *DemoJob) MaxCurrency() uint {
return 1
}

// ShouldRetry ...
func (dj *DemoJob) ShouldRetry() bool {
return true
}

// Validate is implementation of same method in Interface.
func (dj *DemoJob) Validate(params map[string]interface{}) error {
if params == nil || len(params) == 0 {
func (dj *DemoJob) Validate(params job.Parameters) error {
if len(params) == 0 {
return errors.New("parameters required for replication job")
}
name, ok := params["image"]
Expand All @@ -196,14 +208,14 @@ func (dj *DemoJob) Validate(params map[string]interface{}) error {
}

if !strings.HasPrefix(name.(string), "demo") {
return fmt.Errorf("expected '%s' but got '%s'", "demo steven", name)
return fmt.Errorf("expected '%s' but got '%s'", "demo *", name)
}

return nil
}

// Run the replication logic here.
func (dj *DemoJob) Run(ctx env.JobContext, params map[string]interface{}) error {
func (dj *DemoJob) Run(ctx job.Context, params job.Parameters) error {
logger := ctx.GetLogger()

defer func() {
Expand Down Expand Up @@ -292,7 +304,7 @@ Any jobs can launch new jobs through the launch function in the job context. All

```go

func (j *Job) Run(ctx env.JobContext, params map[string]interface{}) error{
func (j *Job) Run(ctx job.Context, params job.Parameters) error{
// ...
subJob, err := ctx.LaunchJob(models.JobRequest{})
// ...
Expand Down Expand Up @@ -333,6 +345,7 @@ var knownLoggers = map[string]*Declaration{
So far, only the following two backends are supported:

* **STD_OUTPUT**: Output the log to the std stream (stdout/stderr)
* **DB**: Output the log to the database with the table name `job_log`
* **FILE**: Output the log to the log files
* sweeper supports
* getter supports
Expand All @@ -354,7 +367,7 @@ An example:
```yaml
#Loggers
loggers:
- name: "STD_OUTPUT" # logger backend name, only support "FILE" and "STD_OUTPUT"
- name: "STD_OUTPUT" # logger backend name, only support "DB", "FILE" and "STD_OUTPUT"
level: "DEBUG" # INFO/DEBUG/WARNING/ERROR/FATAL
- name: "FILE"
level: "DEBUG"
Expand All @@ -364,6 +377,10 @@ loggers:
duration: 1 #days
settings: # Customized settings of sweeper
work_dir: "/tmp/job_logs"
- name: "DB"
level: "DEBUG"
sweeper:
duration: 1 #days
```
## Configuration
Expand Down
2 changes: 1 addition & 1 deletion src/jobservice/mgt/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/goharbor/harbor/src/lib/errors"
)

// Manager defies the related operations to handle the management of job stats.
// Manager defines the related operations to handle the management of job stats.
type Manager interface {
// Get the stats data of all kinds of jobs.
// Data returned by pagination.
Expand Down
13 changes: 11 additions & 2 deletions src/lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ var (
ErrNotFound = errors.New("key not found")
)

// Iterator returns the ScanIterator
type Iterator interface {
Next(ctx context.Context) bool
Val() string
}

//go:generate mockery --name Cache --output . --outpkg cache --filename mock_cache_test.go --structname mockCache --inpackage

// Cache cache interface
type Cache interface {
// Contains returns true if key exists
Expand All @@ -57,8 +65,9 @@ type Cache interface {
// Save cache the value by key
Save(ctx context.Context, key string, value interface{}, expiration ...time.Duration) error

// Keys returns the key matched by prefixes
Keys(ctx context.Context, prefixes ...string) ([]string, error)
// Scan scans the keys matched by match string
// NOTICE: memory cache does not support use wildcard, compared by strings.Contains
Scan(ctx context.Context, match string) (Iterator, error)
}

var (
Expand Down
13 changes: 6 additions & 7 deletions src/lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ import (
"fmt"
"testing"

"github.com/goharbor/harbor/src/lib/retry"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

cachetesting "github.com/goharbor/harbor/src/testing/lib/cache"
"github.com/goharbor/harbor/src/testing/mock"
)

type CacheTestSuite struct {
Expand All @@ -30,7 +29,7 @@ type CacheTestSuite struct {

func (suite *CacheTestSuite) SetupSuite() {
Register("mock", func(opts Options) (Cache, error) {
return &cachetesting.Cache{}, nil
return &mockCache{}, nil
})
}

Expand Down Expand Up @@ -62,8 +61,8 @@ func (suite *CacheTestSuite) TestInitialize() {

{
Register("cache", func(opts Options) (Cache, error) {
c := &cachetesting.Cache{}
c.On("Ping", mock.Anything).Return(fmt.Errorf("oops"))
c := &mockCache{}
c.On("Ping", mock.Anything).Return(retry.Abort(fmt.Errorf("oops")))

return c, nil
})
Expand All @@ -75,7 +74,7 @@ func (suite *CacheTestSuite) TestInitialize() {

{
Register("cache", func(opts Options) (Cache, error) {
c := &cachetesting.Cache{}
c := &mockCache{}
c.On("Ping", mock.Anything).Return(nil)

return c, nil
Expand Down
9 changes: 4 additions & 5 deletions src/lib/cache/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/stretchr/testify/suite"

cachetesting "github.com/goharbor/harbor/src/testing/lib/cache"
"github.com/goharbor/harbor/src/testing/mock"
)

Expand All @@ -42,7 +41,7 @@ func (suite *FetchOrSaveTestSuite) SetupSuite() {
}

func (suite *FetchOrSaveTestSuite) TestFetchInternalError() {
c := &cachetesting.Cache{}
c := &mockCache{}

mock.OnAnything(c, "Fetch").Return(fmt.Errorf("oops"))

Expand All @@ -55,7 +54,7 @@ func (suite *FetchOrSaveTestSuite) TestFetchInternalError() {
}

func (suite *FetchOrSaveTestSuite) TestBuildError() {
c := &cachetesting.Cache{}
c := &mockCache{}

mock.OnAnything(c, "Fetch").Return(ErrNotFound)

Expand All @@ -68,7 +67,7 @@ func (suite *FetchOrSaveTestSuite) TestBuildError() {
}

func (suite *FetchOrSaveTestSuite) TestSaveError() {
c := &cachetesting.Cache{}
c := &mockCache{}

mock.OnAnything(c, "Fetch").Return(ErrNotFound)
mock.OnAnything(c, "Save").Return(fmt.Errorf("oops"))
Expand All @@ -83,7 +82,7 @@ func (suite *FetchOrSaveTestSuite) TestSaveError() {
}

func (suite *FetchOrSaveTestSuite) TestSaveCalledOnlyOneTime() {
c := &cachetesting.Cache{}
c := &mockCache{}

var data sync.Map

Expand Down
58 changes: 43 additions & 15 deletions src/lib/cache/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,27 +117,55 @@ func (c *Cache) Save(ctx context.Context, key string, value interface{}, expirat
return nil
}

// Keys returns the key matched by prefixes.
func (c *Cache) Keys(ctx context.Context, prefixes ...string) ([]string, error) {
// if no prefix, means match all keys.
matchAll := len(prefixes) == 0
// range map to get all keys
keys := make([]string, 0)
// Scan scans the keys matched by match string
func (c *Cache) Scan(ctx context.Context, match string) (cache.Iterator, error) {
var keys []string
c.storage.Range(func(k, v interface{}) bool {
ks := k.(string)
if matchAll {
keys = append(keys, ks)
} else {
for _, p := range prefixes {
if strings.HasPrefix(ks, c.opts.Key(p)) {
keys = append(keys, strings.TrimPrefix(ks, c.opts.Prefix))
}
matched := true
if match != "" {
matched = strings.Contains(k.(string), match)
}

if matched {
if v.(*entry).isExpirated() {
c.storage.Delete(k)
} else {
keys = append(keys, strings.TrimPrefix(k.(string), c.opts.Prefix))
}
}
return true
})

return keys, nil
return &ScanIterator{keys: keys}, nil
}

// ScanIterator is a ScanIterator for memory cache
type ScanIterator struct {
mu sync.Mutex
pos int
keys []string
}

// Next checks whether has the next element
func (i *ScanIterator) Next(ctx context.Context) bool {
i.mu.Lock()
defer i.mu.Unlock()

i.pos++
return i.pos <= len(i.keys)
}

// Val returns the key
func (i *ScanIterator) Val() string {
i.mu.Lock()
defer i.mu.Unlock()

var val string
if i.pos <= len(i.keys) {
val = i.keys[i.pos-1]
}

return val
}

// New returns memory cache
Expand Down
71 changes: 49 additions & 22 deletions src/lib/cache/memory/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package memory

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -109,28 +110,54 @@ func (suite *CacheTestSuite) TestPing() {
suite.NoError(suite.cache.Ping(suite.ctx))
}

func (suite *CacheTestSuite) TestKeys() {
key1 := "p1"
key2 := "p2"

var err error
err = suite.cache.Save(suite.ctx, key1, "hello, p1")
suite.Nil(err)
err = suite.cache.Save(suite.ctx, key2, "hello, p2")
suite.Nil(err)

// should match all
keys, err := suite.cache.Keys(suite.ctx, "p")
suite.Nil(err)
suite.ElementsMatch([]string{"p1", "p2"}, keys)
// only get p1
keys, err = suite.cache.Keys(suite.ctx, key1)
suite.Nil(err)
suite.Equal([]string{"p1"}, keys)
// only get p2
keys, err = suite.cache.Keys(suite.ctx, key2)
suite.Nil(err)
suite.Equal([]string{"p2"}, keys)
func (suite *CacheTestSuite) TestScan() {
seed := func(n int) {
for i := 0; i < n; i++ {
key := fmt.Sprintf("test-scan-%d", i)
err := suite.cache.Save(suite.ctx, key, "")
suite.NoError(err)
}
}
clean := func(n int) {
for i := 0; i < n; i++ {
key := fmt.Sprintf("test-scan-%d", i)
err := suite.cache.Delete(suite.ctx, key)
suite.NoError(err)
}
}
{
// no match should return all keys
expect := []string{"test-scan-0", "test-scan-1", "test-scan-2"}
// seed data
seed(3)
// test scan
iter, err := suite.cache.Scan(suite.ctx, "")
suite.NoError(err)
got := []string{}
for iter.Next(suite.ctx) {
got = append(got, iter.Val())
}
suite.ElementsMatch(expect, got)
// clean up
clean(3)
}

{
// with match should return matched keys
expect := []string{"test-scan-1", "test-scan-10"}
// seed data
seed(11)
// test scan
iter, err := suite.cache.Scan(suite.ctx, "test-scan-1")
suite.NoError(err)
got := []string{}
for iter.Next(suite.ctx) {
got = append(got, iter.Val())
}
suite.ElementsMatch(expect, got)
// clean up
clean(11)
}
}

func TestCacheTestSuite(t *testing.T) {
Expand Down
Loading

0 comments on commit 0574a34

Please sign in to comment.