Skip to content

Commit

Permalink
[#127]: feature: support ACK/NACK/REQUEUE
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Jun 20, 2024
2 parents 446cfd4 + 475b310 commit 263623f
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 11 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ jobs:
cache: false

- name: Run linter
uses: golangci/golangci-lint-action@v4.0.0 # Action page: <https://github.com/golangci/golangci-lint-action>
uses: golangci/golangci-lint-action@v6.0.1 # Action page: <https://github.com/golangci/golangci-lint-action>
with:
version: v1.57 # without patch version
version: v1.59 # without patch version
only-new-issues: false # show only new issues if it's a pull request
args: --timeout=10m --build-tags=race
8 changes: 4 additions & 4 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ jobs:
timeout-minutes: 60
strategy:
matrix:
php: [ "8.2" ]
go: [ stable ]
os: [ "ubuntu-latest" ]
php: ["8.3"]
go: [stable]
os: ["ubuntu-latest"]
steps:
- name: Set up Go ${{ matrix.go }}
uses: actions/setup-go@v5 # action page: <https://github.com/actions/setup-go>
Expand Down Expand Up @@ -68,7 +68,7 @@ jobs:
docker compose -f env/docker-compose-jobs.yaml up -d
sleep 30
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat pkgs.txt) -coverprofile=./coverage-ci/jobs.out -covermode=atomic jobs_general_test.go
docker compose -f env/docker-compose-jobs.yaml down
Expand Down
2 changes: 2 additions & 0 deletions interfaces.go → apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Pool interface {
}

type Configurer interface {
// Experimental checks if there are any experimental features enabled.
Experimental() bool
// UnmarshalKey takes a single key and unmarshal it into a Struct.
UnmarshalKey(name string, out any) error
// Has checks if config section exists.
Expand Down
8 changes: 8 additions & 0 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ func (p *Plugin) listener() {
p.metrics.CountJobErr()
p.log.Error("response handler error", zap.Error(err), zap.String("ID", jb.ID()), zap.ByteString("response", resp.Body), zap.Time("start", start), zap.Int64("elapsed", time.Since(start).Milliseconds()))
p.putPayload(exec)
// we don't need to use ACK to prevent endless loop here, since the ACK is controlled on the PHP side.
// When experimental features are enabled, skip further processing of the current job.
if p.experimental {
jb = nil
span.End()
continue
}

/*
Job malformed, acknowledge it to prevent endless loop
*/
Expand Down
10 changes: 6 additions & 4 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ type Plugin struct {
mu sync.RWMutex

// Jobs plugin configuration
cfg *Config `structure:"jobs"`
log *zap.Logger
workersPool Pool
server Server
cfg *Config `structure:"jobs"`
log *zap.Logger
workersPool Pool
server Server
experimental bool

jobConstructors map[string]jobsApi.Constructor
consumers sync.Map // map[string]jobs.Consumer
Expand Down Expand Up @@ -117,6 +118,7 @@ func (p *Plugin) Init(cfg Configurer, log Logger, server Server) error {
p.log = new(zap.Logger)
p.log = log.NamedLogger(PluginName)
p.jobsProcessor = newPipesProc(p.log, &p.consumers, &p.consume, p.cfg.CfgOptions.Parallelism)
p.experimental = cfg.Experimental()

// collector
p.metrics = newStatsExporter(p)
Expand Down
25 changes: 24 additions & 1 deletion protocol/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (rh *RespHandler) handleErrResp(data []byte, jb jobs.Job) error {
return nil
}

// user don't want to requeue the job - silently ACK and return nil
// the user doesn't want to requeue the job - silently ACK and return nil
errAck := jb.Ack()
if errAck != nil {
rh.log.Error("job acknowledge was failed", zap.Error(errors.E(er.Msg)), zap.Error(errAck))
Expand All @@ -37,3 +37,26 @@ func (rh *RespHandler) handleErrResp(data []byte, jb jobs.Job) error {

return nil
}

func (rh *RespHandler) requeue(data []byte, jb jobs.Job) error {
er := rh.getErrResp()
defer rh.putErrResp(er)

err := json.Unmarshal(data, er)
if err != nil {
return err
}

err = jb.Requeue(er.Headers, er.Delay)
if err != nil {
return err
}

rh.log.Info("job was re-queued",
zap.String("message", er.Msg),
zap.Int64("delay", er.Delay),
zap.Bool("requeue", er.Requeue),
)

return nil
}
21 changes: 21 additions & 0 deletions protocol/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type Type uint32
const (
NoError Type = iota
Error
ACK
NACK
REQUEUE
)

// internal worker protocol (jobs mode)
Expand Down Expand Up @@ -76,6 +79,24 @@ func (rh *RespHandler) Handle(pld *payload.Payload, jb jobs.Job) error {
return errors.E(op, err)
}
return nil
case ACK:
err = jb.Ack()
if err != nil {
return errors.E(op, err)
}
return nil
case NACK:
err = jb.Nack()
if err != nil {
return errors.E(op, err)
}
return nil
case REQUEUE:
err = rh.requeue(p.Data, jb)
if err != nil {
return errors.E(op, err)
}
return nil
default:
rh.log.Warn("unknown response type, acknowledging the JOB", zap.Uint32("type", uint32(p.T)))
err = jb.Ack()
Expand Down

0 comments on commit 263623f

Please sign in to comment.