Skip to content

Commit e426e61

Browse files
committed
fix: redis lock improvments
1 parent e627422 commit e426e61

File tree

5 files changed

+68
-83
lines changed

5 files changed

+68
-83
lines changed

Dockerfile.build

-56
This file was deleted.

Dockerfile.test

+7-9
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,21 @@
11
FROM ghcr.io/aldor007/mort-base
22

3-
ENV GOLANG_VERSION 1.16.6
4-
ARG TARGETARCH amd64
3+
ENV GOLANG_VERSION 1.19.2
4+
ENV TARGETARCH amd64
55
ARG TAG 'dev'
66
ARG COMMIT "master"
77
ARG DATE "now"
88

9-
ENV GOLANG_DOWNLOAD_URL https://golang.org/dl/go$GOLANG_VERSION.linux-$TARGETARCH.tar.gz
10-
11-
RUN curl -fsSL --insecure "$GOLANG_DOWNLOAD_URL" -o golang.tar.gz \
12-
&& tar -C /usr/local -xzf golang.tar.gz \
13-
&& rm golang.tar.gz
149

1510
ENV WORKDIR /workspace
1611
ENV PATH /usr/local/go/bin:$PATH
17-
12+
RUN rm -rf /usr/local/go && curl -fsSL --insecure "https://go.dev/dl/go$GOLANG_VERSION.linux-$TARGETARCH.tar.gz" -o golang.tar.gz \
13+
&& tar -C /usr/local -xzf golang.tar.gz \
14+
&& rm golang.tar.gz
1815

1916
WORKDIR $WORKDIR
2017
COPY go.mod ./
2118
COPY go.sum ./
22-
RUN go mod download
2319

2420
COPY cmd/ $WORKDIR/cmd
2521
COPY .godir ${WORKDIR}/.godir
@@ -28,3 +24,5 @@ COPY etc/ ${WORKDIR}/etc
2824
COPY pkg/ ${WORKDIR}/pkg
2925
COPY scripts/ ${WORKDIR}/scripts
3026
COPY Makefile ${WORKDIR}/Makefile
27+
28+
RUN go build -o /go/mort ./cmd/mort/mort.go

pkg/lock/redis.go

+22-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package lock
22

33
import (
4+
"github.com/aldor007/mort/pkg/monitoring"
5+
"go.uber.org/zap"
46
"sync"
57
"time"
68

@@ -34,6 +36,7 @@ type rediser interface {
3436
}
3537

3638
type internalLockRedis struct {
39+
lockData
3740
lock *redislock.Lock
3841
pubsub *goRedis.PubSub
3942
}
@@ -86,11 +89,15 @@ func (m *RedisLock) NotifyAndRelease(ctx context.Context, key string, originalRe
8689
defer m.lock.Unlock()
8790
lock, ok := m.locks[key]
8891
if ok {
89-
lock.lock.Release(ctx)
92+
err := lock.lock.Release(ctx)
93+
if err != nil {
94+
monitoring.Log().Error("redis release error", zap.String("key", key), zap.Error(err))
95+
}
9096
delete(m.locks, key)
9197
if lock.pubsub != nil {
92-
m.redisClient.Publish(ctx, key, 1)
9398
lock.pubsub.Close()
99+
} else {
100+
m.redisClient.Publish(ctx, key, 1)
94101
}
95102
}
96103

@@ -106,41 +113,45 @@ func (m *RedisLock) Lock(ctx context.Context, key string) (result LockResult, ok
106113
lock.lock.Refresh(ctx, time.Second*time.Duration(m.LockTimeout/2), nil)
107114
} else {
108115
lock, err := m.client.Obtain(ctx, key, time.Duration(m.LockTimeout)*time.Second, nil)
109-
pubsub := m.redisClient.Subscribe(ctx, key)
110-
lockData := internalLockRedis{lock: lock, pubsub: pubsub}
116+
lockData := internalLockRedis{lock: lock, pubsub: nil}
111117

118+
ok = false
112119
if err == redislock.ErrNotObtained {
113120
result, ok = m.memoryLock.forceLockAndAddWatch(ctx, key)
114-
ok = false
121+
pubsub := m.redisClient.Subscribe(ctx, key)
122+
lockData.pubsub = pubsub
115123
// Go channel which receives messages.
116124
ch := pubsub.Channel()
117125

118126
go func() {
119127
for {
120128
select {
121129
case <-ch:
122-
m.NotifyAndRelease(ctx, key, nil)
130+
m.memoryLock.NotifyAndRelease(ctx, key, nil)
123131
return
124132
case <-result.Cancel:
125133
m.memoryLock.Release(ctx, key)
126-
pubsub.Close()
134+
err := pubsub.Close()
135+
if err != nil {
136+
monitoring.Log().Error("Redis lock pubsub err", zap.String("key", key), zap.Error(err))
137+
}
127138
return
128139
case <-ctx.Done():
129140
m.memoryLock.Release(ctx, key)
130-
pubsub.Close()
141+
err := pubsub.Close()
142+
if err != nil {
143+
monitoring.Log().Error("Redis lock pubsub err", zap.String("key", key), zap.Error(err))
144+
}
131145
return
132146
}
133147
}
134148
}()
135149

136-
return
137-
138150
} else if err != nil {
139151
result.Error = err
140152
return
141153
}
142154
m.locks[key] = lockData
143-
144155
}
145156
return m.memoryLock.Lock(ctx, key)
146157
}

pkg/lock/redis_test.go

+35
Original file line numberDiff line numberDiff line change
@@ -142,3 +142,38 @@ func TestRedisLock_NotifyAndReleaseTwoInstancesOfLock(t *testing.T) {
142142

143143
}
144144
}
145+
func TestRedisLock_Cancel(t *testing.T) {
146+
s := miniredis.RunT(t)
147+
148+
l := NewRedisLock([]string{s.Addr()}, nil)
149+
key := "kluczi22"
150+
ctx := context.Background()
151+
c, acquired := l.Lock(ctx, key)
152+
153+
assert.True(t, acquired, "Should acquire lock")
154+
assert.Nil(t, c.ResponseChan, "shouldn't return channel")
155+
156+
l2 := NewRedisLock([]string{s.Addr()}, nil)
157+
result, lock := l2.Lock(ctx, key)
158+
159+
assert.False(t, lock, "Shouldn't acquire lock")
160+
assert.NotNil(t, result.ResponseChan, "should return channel")
161+
162+
go func() {
163+
c.Cancel <- true
164+
}()
165+
166+
buf := make([]byte, 1000)
167+
go l.NotifyAndRelease(ctx, key, response.NewBuf(200, buf))
168+
169+
timer := time.NewTimer(time.Second * 2)
170+
select {
171+
case <-timer.C:
172+
t.Fatalf("timeout waiting for lock")
173+
return
174+
case res, ok := <-result.ResponseChan:
175+
assert.False(t, ok, "channel shouldn't be closed")
176+
assert.Nil(t, res, "should be nil")
177+
178+
}
179+
}

pkg/processor/processor.go

+4-7
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ type requestMessage struct {
6262
responseChan chan *response.Response
6363
obj *object.FileObject
6464
request *http.Request
65-
cancel chan struct{}
6665
}
6766

6867
// Process handle incoming request and create response
@@ -72,7 +71,7 @@ func (r *RequestProcessor) Process(req *http.Request, obj *object.FileObject) *r
7271
var timeout context.CancelFunc
7372

7473
// if object has transfroms we can assume that it is image generated by mort
75-
// othervise it can be big zip file
74+
// otherwise it can be big zip file
7675
if obj.HasTransform() {
7776
ctx, timeout = context.WithTimeout(pCtx, r.processTimeout)
7877
defer timeout()
@@ -85,13 +84,11 @@ func (r *RequestProcessor) Process(req *http.Request, obj *object.FileObject) *r
8584
msg.request = req
8685
msg.obj = obj
8786
msg.responseChan = make(chan *response.Response)
88-
msg.cancel = make(chan struct{}, 1)
8987

9088
go r.processChan(ctx, msg)
9189

9290
select {
9391
case <-ctx.Done():
94-
close(msg.cancel)
9592
monitoring.Log().Warn("Process timeout", obj.LogData(zap.String("error", "Context.timeout"))...)
9693
return r.replyWithError(obj, 499, errContextCancel)
9794
case res := <-msg.responseChan:
@@ -104,9 +101,6 @@ func (r *RequestProcessor) Process(req *http.Request, obj *object.FileObject) *r
104101
func (r *RequestProcessor) processChan(ctx context.Context, msg requestMessage) {
105102
res := r.process(msg.request, msg.obj)
106103
select {
107-
case <-msg.cancel:
108-
res.Close()
109-
return
110104
case <-ctx.Done():
111105
res.Close()
112106
return
@@ -243,6 +237,9 @@ func (r *RequestProcessor) collapseGET(req *http.Request, obj *object.FileObject
243237
return r.replyWithError(obj, 504, errContextCancel)
244238
case res, ok := <-lockResult.ResponseChan:
245239
if !ok || res == nil {
240+
if cacheRes, err := r.responseCache.Get(obj); err == nil {
241+
return cacheRes
242+
}
246243
return r.handleGET(req, obj)
247244
}
248245
return res

0 commit comments

Comments
 (0)