Skip to content

Commit b8c656e

Browse files
authored
fix(cdn): get item unit from buffer while deleting it (#5831)
1 parent b2559e4 commit b8c656e

File tree

6 files changed

+279
-13
lines changed

6 files changed

+279
-13
lines changed

engine/cdn/cdn_gc.go

+30-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/rockbears/log"
99

10+
"github.com/ovh/cds/engine/cache"
1011
"github.com/ovh/cds/engine/cdn/item"
1112
"github.com/ovh/cds/engine/cdn/storage"
1213
"github.com/ovh/cds/sdk"
@@ -153,22 +154,49 @@ func (s *Service) cleanBuffer(ctx context.Context) error {
153154
if len(itemIDs) == 0 {
154155
continue
155156
}
156-
157157
itemUnitsIDs, err := storage.LoadAllItemUnitsIDsByItemIDsAndUnitID(s.mustDBWithCtx(ctx), bu.ID(), itemIDs)
158158
if err != nil {
159159
ctx := sdk.ContextWithStacktrace(ctx, err)
160160
log.Error(ctx, "unable to load item units: %v", err)
161161
continue
162162
}
163163

164+
itemUnitsToMark := make([]string, 0, len(itemUnitsIDs))
165+
for i := range itemUnitsIDs {
166+
uiID := itemUnitsIDs[i]
167+
lockKey := cache.Key(storage.FileBufferKey, bu.ID(), "lock", uiID)
168+
b, err := s.Cache.Lock(lockKey, 30*time.Second, 0, 1)
169+
if err != nil {
170+
log.Error(ctx, "unable to lock unit item: %v: %v", uiID, err)
171+
continue
172+
}
173+
if !b {
174+
log.Info(ctx, "do not delete item unit %s, already locked: %v", uiID)
175+
continue
176+
}
177+
readerPatternKey := cache.Key(storage.FileBufferKey, bu.ID(), "reader", uiID, "*")
178+
keys, err := s.Cache.Keys(cache.Key(readerPatternKey))
179+
if err != nil {
180+
log.Error(ctx, "unable to check if item unit is currently reading by cdn")
181+
_ = s.Cache.Unlock(lockKey)
182+
continue
183+
}
184+
if len(keys) > 0 {
185+
log.Info(ctx, "do not delete item unit, it is currently reading by cdn")
186+
_ = s.Cache.Unlock(lockKey)
187+
continue
188+
}
189+
itemUnitsToMark = append(itemUnitsToMark, uiID)
190+
}
191+
164192
tx, err := s.mustDBWithCtx(ctx).Begin()
165193
if err != nil {
166194
ctx := sdk.ContextWithStacktrace(ctx, err)
167195
log.Error(ctx, "unable to start transaction: %v", err)
168196
continue
169197
}
170198

171-
if _, err := storage.MarkItemUnitToDelete(tx, itemUnitsIDs); err != nil {
199+
if _, err := storage.MarkItemUnitToDelete(tx, itemUnitsToMark); err != nil {
172200
_ = tx.Rollback()
173201
ctx := sdk.ContextWithStacktrace(ctx, err)
174202
log.Error(ctx, "unable to mark item as delete: %v", err)

engine/cdn/cdn_gc_test.go

+130-4
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,19 @@ import (
66
"testing"
77
"time"
88

9+
"github.com/ovh/symmecrypt/ciphers/aesgcm"
10+
"github.com/ovh/symmecrypt/convergent"
11+
"github.com/rockbears/log"
12+
"github.com/stretchr/testify/require"
13+
14+
"github.com/ovh/cds/engine/cache"
915
"github.com/ovh/cds/engine/cdn/item"
1016
"github.com/ovh/cds/engine/cdn/lru"
1117
"github.com/ovh/cds/engine/cdn/storage"
1218
cdntest "github.com/ovh/cds/engine/cdn/test"
1319
"github.com/ovh/cds/engine/gorpmapper"
1420
"github.com/ovh/cds/engine/test"
1521
"github.com/ovh/cds/sdk"
16-
"github.com/ovh/symmecrypt/ciphers/aesgcm"
17-
"github.com/ovh/symmecrypt/convergent"
18-
"github.com/rockbears/log"
19-
"github.com/stretchr/testify/require"
2022
)
2123

2224
func TestCleanSynchronizedItem(t *testing.T) {
@@ -503,3 +505,127 @@ func TestPurgeItem(t *testing.T) {
503505
require.NoError(t, err)
504506
require.Equal(t, 1, len(items))
505507
}
508+
509+
func TestCleanSynchronizedReadingItem(t *testing.T) {
510+
m := gorpmapper.New()
511+
item.InitDBMapping(m)
512+
storage.InitDBMapping(m)
513+
514+
log.Factory = log.NewTestingWrapper(t)
515+
db, factory, store, cancel := test.SetupPGToCancel(t, m, sdk.TypeCDN)
516+
t.Cleanup(cancel)
517+
518+
cfg := test.LoadTestingConf(t, sdk.TypeCDN)
519+
520+
cdntest.ClearItem(t, context.TODO(), m, db)
521+
cdntest.ClearUnits(t, context.TODO(), m, db)
522+
523+
// Create cdn service
524+
s := Service{
525+
DBConnectionFactory: factory,
526+
Cache: store,
527+
Mapper: m,
528+
}
529+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
530+
531+
tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*")
532+
require.NoError(t, err)
533+
534+
tmpDir2, err := ioutil.TempDir("", t.Name()+"-cdn-2-*")
535+
require.NoError(t, err)
536+
537+
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
538+
t.Cleanup(cancel)
539+
540+
cdnUnits, err := storage.Init(ctx, m, store, db.DbMap, sdk.NewGoRoutines(ctx), storage.Configuration{
541+
HashLocatorSalt: "thisismysalt",
542+
Buffers: map[string]storage.BufferConfiguration{
543+
"redis_buffer": {
544+
Redis: &storage.RedisBufferConfiguration{
545+
Host: cfg["redisHost"],
546+
Password: cfg["redisPassword"],
547+
},
548+
BufferType: storage.CDNBufferTypeLog,
549+
},
550+
"file_buffer": {
551+
Local: &storage.LocalBufferConfiguration{
552+
Path: tmpDir2,
553+
},
554+
BufferType: storage.CDNBufferTypeFile,
555+
},
556+
},
557+
Storages: map[string]storage.StorageConfiguration{
558+
"fs-backend": {
559+
Local: &storage.LocalStorageConfiguration{
560+
Path: tmpDir,
561+
Encryption: []convergent.ConvergentEncryptionConfig{
562+
{
563+
Cipher: aesgcm.CipherName,
564+
LocatorSalt: "secret_locator_salt",
565+
SecretValue: "secret_value",
566+
},
567+
},
568+
},
569+
},
570+
"cds-backend": {
571+
CDS: &storage.CDSStorageConfiguration{
572+
Host: "lolcat.host",
573+
Token: "mytoken",
574+
},
575+
},
576+
},
577+
})
578+
require.NoError(t, err)
579+
s.Units = cdnUnits
580+
581+
// Add Item in redis / fs/ cds -will be delete from redis
582+
it := sdk.CDNItem{
583+
ID: sdk.UUID(),
584+
Type: sdk.CDNTypeItemRunResult,
585+
Status: sdk.CDNStatusItemCompleted,
586+
APIRefHash: sdk.RandomString(10),
587+
}
588+
require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &it))
589+
iuCDS := sdk.CDNItemUnit{UnitID: s.Units.Storages[1].ID(), ItemID: it.ID, Type: it.Type}
590+
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuCDS))
591+
iuFileBuf := sdk.CDNItemUnit{UnitID: s.Units.FileBuffer().ID(), ItemID: it.ID, Type: it.Type}
592+
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileBuf))
593+
iuFileStorage := sdk.CDNItemUnit{UnitID: s.Units.Storages[0].ID(), ItemID: it.ID, Type: it.Type}
594+
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileStorage))
595+
596+
///////////////////////////////////////
597+
// 1st test, getItem Lock the item unit
598+
///////////////////////////////////////
599+
lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", iuFileBuf.ID)
600+
hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1)
601+
require.NoError(t, err)
602+
t.Cleanup(func() {
603+
s.Cache.Unlock(lockKey)
604+
})
605+
require.True(t, hasLocked)
606+
require.NoError(t, s.cleanBuffer(context.TODO()))
607+
608+
_, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID)
609+
require.NoError(t, err)
610+
611+
require.NoError(t, s.Cache.Unlock(lockKey))
612+
613+
////////////////////////////////////////////////////////
614+
// 2nd test, getItem is reading the file from the buffer
615+
////////////////////////////////////////////////////////
616+
readerKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "reader", iuFileBuf.ID, sdk.UUID())
617+
require.NoError(t, s.Cache.SetWithTTL(readerKey, true, 30))
618+
require.NoError(t, s.cleanBuffer(context.TODO()))
619+
620+
_, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID)
621+
require.NoError(t, err)
622+
623+
require.NoError(t, s.Cache.Delete(readerKey))
624+
////////////////////////////////////////////////////////
625+
// 3rd test, mark as delete
626+
////////////////////////////////////////////////////////
627+
require.NoError(t, s.cleanBuffer(context.TODO()))
628+
629+
_, err = storage.LoadItemUnitByID(ctx, m, db, iuFileBuf.ID)
630+
require.True(t, sdk.ErrorIs(err, sdk.ErrNotFound))
631+
}

engine/cdn/cdn_item.go

+43-7
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/rockbears/log"
1717

18+
"github.com/ovh/cds/engine/cache"
1819
"github.com/ovh/cds/engine/cdn/item"
1920
"github.com/ovh/cds/engine/cdn/storage"
2021
"github.com/ovh/cds/engine/gorpmapper"
@@ -208,13 +209,47 @@ func (s *Service) getItemFileValue(ctx context.Context, t sdk.CDNItemType, apiRe
208209

209210
// If item is in Buffer, get from it
210211
if itemUnit != nil {
211-
log.Error(ctx, "getItemFileValue> Getting file from buffer")
212-
213-
rc, err := s.Units.FileBuffer().NewReader(ctx, *itemUnit)
212+
log.Debug(ctx, "getItemFileValue> Getting file from buffer")
213+
ignoreBuffer := false
214+
lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", itemUnit.ID)
215+
hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1)
214216
if err != nil {
215-
return nil, nil, nil, err
217+
log.Error(ctx, "unable to get lock for %s", lockKey)
218+
ignoreBuffer = true
219+
}
220+
if hasLocked {
221+
// Reload to be sure that it's not marked as delete
222+
_, err := storage.LoadItemUnitByID(ctx, s.Mapper, s.mustDBWithCtx(ctx), itemUnit.ID)
223+
if err != nil {
224+
if !sdk.ErrorIs(err, sdk.ErrNotFound) {
225+
log.Error(ctx, "unable to load item unit: %v", err)
226+
}
227+
ignoreBuffer = true
228+
}
229+
230+
if !ignoreBuffer {
231+
readerKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "reader", itemUnit.ID, sdk.UUID())
232+
if err := s.Cache.SetWithTTL(readerKey, true, 30); err != nil {
233+
log.Error(ctx, "unable to set reader on file buffer: %v", err)
234+
ignoreBuffer = true
235+
}
236+
}
237+
238+
if err := s.Cache.Unlock(lockKey); err != nil {
239+
log.Error(ctx, "unable to release lock for %s", lockKey)
240+
}
241+
} else {
242+
ignoreBuffer = true
216243
}
217-
return itemUnit, s.Units.FileBuffer(), rc, nil
244+
245+
if !ignoreBuffer {
246+
rc, err := s.Units.FileBuffer().NewReader(ctx, *itemUnit)
247+
if err != nil {
248+
return nil, nil, nil, err
249+
}
250+
return itemUnit, s.Units.FileBuffer(), rc, nil
251+
}
252+
218253
}
219254

220255
// Get from storage
@@ -349,12 +384,13 @@ func (s *Service) getRandomItemUnitIDByItemID(ctx context.Context, itemID string
349384
return "", "", err
350385
}
351386

387+
itemUnits = s.Units.FilterItemUnitReaderByType(itemUnits)
388+
itemUnits = s.Units.FilterItemUnitFromBuffer(itemUnits)
389+
352390
if len(itemUnits) == 0 {
353391
return "", "", sdk.WithStack(fmt.Errorf("unable to find item units for item with id: %s", itemID))
354392
}
355393

356-
itemUnits = s.Units.FilterItemUnitReaderByType(itemUnits)
357-
358394
var unit *sdk.CDNUnit
359395
var selectedItemUnit *sdk.CDNItemUnit
360396
if defaultUnitName != "" {

engine/cdn/cdn_item_test.go

+52
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/rockbears/log"
1414
"github.com/stretchr/testify/require"
1515

16+
"github.com/ovh/cds/engine/cache"
1617
"github.com/ovh/cds/engine/cdn/item"
1718
"github.com/ovh/cds/engine/cdn/lru"
1819
"github.com/ovh/cds/engine/cdn/redis"
@@ -509,3 +510,54 @@ func TestGetItemValue_ThousandLinesReverse(t *testing.T) {
509510
require.Equal(t, int64(0), lines[226].Number)
510511
require.Equal(t, "Line 0\n", lines[226].Value)
511512
}
513+
514+
func TestGetFileItemFromBuffer(t *testing.T) {
515+
m := gorpmapper.New()
516+
item.InitDBMapping(m)
517+
storage.InitDBMapping(m)
518+
519+
log.Factory = log.NewTestingWrapper(t)
520+
db, factory, store, cancel := test.SetupPGToCancel(t, m, sdk.TypeCDN)
521+
t.Cleanup(cancel)
522+
523+
cdntest.ClearItem(t, context.TODO(), m, db)
524+
cdntest.ClearSyncRedisSet(t, store, "local_storage")
525+
526+
// Create cdn service
527+
s := Service{
528+
DBConnectionFactory: factory,
529+
Cache: store,
530+
Mapper: m,
531+
}
532+
s.GoRoutines = sdk.NewGoRoutines(context.TODO())
533+
534+
ctx, cancel := context.WithCancel(context.TODO())
535+
t.Cleanup(cancel)
536+
537+
_ = test.LoadTestingConf(t, sdk.TypeCDN)
538+
cdnUnits := newRunningStorageUnits(t, m, s.DBConnectionFactory.GetDBMap(m)(), ctx, store)
539+
s.Units = cdnUnits
540+
541+
// Add Item in redis / fs/ cds -will be delete from redis
542+
it := sdk.CDNItem{
543+
ID: sdk.UUID(),
544+
Type: sdk.CDNTypeItemRunResult,
545+
Status: sdk.CDNStatusItemCompleted,
546+
APIRefHash: sdk.RandomString(10),
547+
}
548+
require.NoError(t, item.Insert(context.TODO(), s.Mapper, db, &it))
549+
iuFileBuf := sdk.CDNItemUnit{UnitID: s.Units.FileBuffer().ID(), ItemID: it.ID, Type: it.Type}
550+
require.NoError(t, storage.InsertItemUnit(context.TODO(), s.Mapper, db, &iuFileBuf))
551+
552+
// IU locked by gc
553+
lockKey := cache.Key(storage.FileBufferKey, s.Units.FileBuffer().ID(), "lock", iuFileBuf.ID)
554+
hasLocked, err := s.Cache.Lock(lockKey, 5*time.Second, 0, 1)
555+
require.NoError(t, err)
556+
require.True(t, hasLocked)
557+
558+
ui, unit, reader, err := s.getItemFileValue(ctx, sdk.CDNTypeItemRunResult, it.APIRefHash, getItemFileOptions{})
559+
require.Nil(t, reader)
560+
require.Nil(t, ui)
561+
require.Nil(t, unit)
562+
require.Contains(t, err.Error(), "unable to find item units for item with id: "+it.ID)
563+
}

engine/cdn/storage/storageunit.go

+4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ import (
2020
cdslog "github.com/ovh/cds/sdk/log"
2121
)
2222

23+
var (
24+
FileBufferKey = cache.Key("cdn", "unit")
25+
)
26+
2327
func (r RunningStorageUnits) Storage(name string) StorageUnit {
2428
for _, x := range r.Storages {
2529
if x.Name() == name {

engine/cdn/storage/types.go

+20
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,17 @@ func (x RunningStorageUnits) GetBuffer(bufferType sdk.CDNItemType) BufferUnit {
286286
}
287287
}
288288

289+
func (x *RunningStorageUnits) FilterItemUnitFromBuffer(ius []sdk.CDNItemUnit) []sdk.CDNItemUnit {
290+
itemsUnits := make([]sdk.CDNItemUnit, 0, len(ius))
291+
for _, u := range ius {
292+
if x.IsBuffer(u.UnitID) {
293+
continue
294+
}
295+
itemsUnits = append(itemsUnits, u)
296+
}
297+
return itemsUnits
298+
}
299+
289300
func (x *RunningStorageUnits) FilterItemUnitReaderByType(ius []sdk.CDNItemUnit) []sdk.CDNItemUnit {
290301
// Remove cds backend from getting something that is not a log
291302
if ius[0].Type != sdk.CDNTypeItemStepLog && ius[0].Type != sdk.CDNTypeItemServiceLog {
@@ -307,6 +318,15 @@ func (x *RunningStorageUnits) FilterItemUnitReaderByType(ius []sdk.CDNItemUnit)
307318
return ius
308319
}
309320

321+
func (x *RunningStorageUnits) IsBuffer(id string) bool {
322+
for _, buf := range x.Buffers {
323+
if buf.ID() == id {
324+
return true
325+
}
326+
}
327+
return false
328+
}
329+
310330
type LogConfig struct {
311331
// Step logs
312332
StepMaxSize int64 `toml:"stepMaxSize" default:"15728640" comment:"Max step logs size in bytes (default: 15MB)" json:"stepMaxSize"`

0 commit comments

Comments
 (0)