Skip to content

Commit a0920e1

Browse files
authored
fix(cdn): clean old worker cache items (#5856)
1 parent 0c1946b commit a0920e1

File tree

3 files changed

+58
-30
lines changed

3 files changed

+58
-30
lines changed

engine/cdn/cdn_file.go

+33-17
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"io"
1111
"net/http"
1212
"os"
13+
"sort"
1314

1415
"github.com/ovh/cds/engine/cdn/item"
1516
"github.com/ovh/cds/engine/cdn/storage"
@@ -126,11 +127,6 @@ func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.Re
126127
}
127128
defer tx.Rollback() //nolint
128129

129-
// Check and Clean file with same ref
130-
if err := s.cleanPreviousFileItem(ctx, tx, sig, itemType, apiRef.ToFilename()); err != nil {
131-
return err
132-
}
133-
134130
// Insert Item
135131
if err := item.Insert(ctx, s.Mapper, tx, it); err != nil {
136132
return err
@@ -186,22 +182,42 @@ func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.Re
186182
}
187183

188184
s.Units.PushInSyncQueue(ctx, it.ID, it.Created)
185+
186+
// For worker cache item clean others with same ref to purge old cached data
187+
if itemType == sdk.CDNTypeItemWorkerCache {
188+
tx, err := s.mustDBWithCtx(ctx).Begin()
189+
if err != nil {
190+
return sdk.WithStack(err)
191+
}
192+
defer tx.Rollback() //nolint
193+
194+
if err := s.cleanPreviousCachedData(ctx, tx, sig, apiRef.ToFilename()); err != nil {
195+
return err
196+
}
197+
198+
if err := tx.Commit(); err != nil {
199+
return sdk.WithStack(err)
200+
}
201+
}
202+
189203
return nil
190204
}
191205

192-
func (s *Service) cleanPreviousFileItem(ctx context.Context, tx gorpmapper.SqlExecutorWithTx, sig cdn.Signature, itemType sdk.CDNItemType, name string) error {
193-
switch itemType {
194-
case sdk.CDNTypeItemWorkerCache:
195-
// Check if item already exist
196-
existingItem, err := item.LoadFileByProjectAndCacheTag(ctx, s.Mapper, tx, itemType, sig.ProjectKey, name)
197-
if err != nil {
198-
if !sdk.ErrorIs(err, sdk.ErrNotFound) {
199-
return err
200-
}
201-
return nil
206+
// Mark to delete all items for given cache tag except the most recent one.
207+
func (s *Service) cleanPreviousCachedData(ctx context.Context, tx gorpmapper.SqlExecutorWithTx, sig cdn.Signature, cacheTag string) error {
208+
items, err := item.LoadWorkerCacheItemsByProjectAndCacheTag(ctx, s.Mapper, tx, sig.ProjectKey, cacheTag)
209+
if err != nil {
210+
return err
211+
}
212+
213+
sort.Slice(items, func(i, j int) bool { return items[i].Created.Before(items[j].Created) })
214+
215+
for i := 0; i < len(items)-1; i++ {
216+
items[i].ToDelete = true
217+
if err := item.Update(ctx, s.Mapper, tx, &items[i]); err != nil {
218+
return err
202219
}
203-
existingItem.ToDelete = true
204-
return item.Update(ctx, s.Mapper, tx, existingItem)
205220
}
221+
206222
return nil
207223
}

engine/cdn/item/dao.go

+24-12
Original file line numberDiff line numberDiff line change
@@ -144,31 +144,44 @@ func Update(ctx context.Context, m *gorpmapper.Mapper, db gorpmapper.SqlExecutor
144144

145145
func MarkToDeleteByRunIDs(db gorpmapper.SqlExecutorWithTx, runID int64) error {
146146
query := `
147-
UPDATE item SET to_delete = true WHERE (api_ref->>'run_id')::int = $1
147+
UPDATE item SET to_delete = true WHERE (api_ref->>'run_id')::int = $1
148148
`
149149
_, err := db.Exec(query, runID)
150150
return sdk.WrapError(err, "unable to mark item to delete for run %d", runID)
151151
}
152152

153-
func LoadFileByProjectAndCacheTag(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, itemType sdk.CDNItemType, projKey string, cacheTag string) (*sdk.CDNItem, error) {
153+
func LoadWorkerCacheItemByProjectAndCacheTag(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, projKey string, cacheTag string) (*sdk.CDNItem, error) {
154154
query := gorpmapper.NewQuery(`
155155
SELECT *
156156
FROM item
157157
WHERE type = $1
158158
AND (api_ref->>'project_key')::text = $2
159159
AND (api_ref->>'cache_tag')::text = $3
160160
AND to_delete = false
161-
162-
`).Args(itemType, projKey, cacheTag)
161+
ORDER BY created DESC
162+
LIMIT 1
163+
`).Args(sdk.CDNTypeItemWorkerCache, projKey, cacheTag)
163164
return getItem(ctx, m, db, query)
164165
}
165166

167+
func LoadWorkerCacheItemsByProjectAndCacheTag(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, projKey string, cacheTag string) ([]sdk.CDNItem, error) {
168+
query := gorpmapper.NewQuery(`
169+
SELECT *
170+
FROM item
171+
WHERE type = $1
172+
AND (api_ref->>'project_key')::text = $2
173+
AND (api_ref->>'cache_tag')::text = $3
174+
AND to_delete = false
175+
`).Args(sdk.CDNTypeItemWorkerCache, projKey, cacheTag)
176+
return getItems(ctx, m, db, query)
177+
}
178+
166179
// LoadByAPIRefHashAndType load an item by his job id, step order and type
167180
func LoadByAPIRefHashAndType(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, hash string, itemType sdk.CDNItemType, opts ...gorpmapper.GetOptionFunc) (*sdk.CDNItem, error) {
168181
query := gorpmapper.NewQuery(`
169182
SELECT *
170183
FROM item
171-
WHERE api_ref_hash = $1
184+
WHERE api_ref_hash = $1
172185
AND type = $2
173186
AND to_delete = false
174187
`).Args(hash, itemType)
@@ -179,7 +192,7 @@ func LoadByAPIRefHashAndType(ctx context.Context, m *gorpmapper.Mapper, db gorp.
179192
func ComputeSizeByIDs(db gorp.SqlExecutor, itemIDs []string) (int64, error) {
180193
query := `
181194
SELECT COALESCE(SUM(size), 0) FROM item
182-
WHERE id = ANY($1)
195+
WHERE id = ANY($1)
183196
`
184197
size, err := db.SelectInt(query, pq.StringArray(itemIDs))
185198
if err != nil {
@@ -191,9 +204,9 @@ func ComputeSizeByIDs(db gorp.SqlExecutor, itemIDs []string) (int64, error) {
191204
func ListNodeRunByProject(db gorp.SqlExecutor, projectKey string) ([]int64, error) {
192205
var IDs []int64
193206
query := `
194-
SELECT
207+
SELECT
195208
DISTINCT((api_ref->>'node_run_id')::int)
196-
FROM item
209+
FROM item
197210
WHERE api_ref->>'project_key' = $1
198211
`
199212
_, err := db.Select(&IDs, query, projectKey)
@@ -206,7 +219,7 @@ func ListNodeRunByProject(db gorp.SqlExecutor, projectKey string) ([]int64, erro
206219
// ComputeSizeByProjectKey returns the size used by a project
207220
func ComputeSizeByProjectKey(db gorp.SqlExecutor, projectKey string) (int64, error) {
208221
query := `
209-
SELECT SUM(size) FROM item WHERE api_ref->>'project_key' = $1
222+
SELECT SUM(size) FROM item WHERE api_ref->>'project_key' = $1
210223
`
211224
size, err := db.SelectInt(query, projectKey)
212225
if err != nil {
@@ -223,15 +236,15 @@ type Stat struct {
223236

224237
func CountItems(db gorp.SqlExecutor) (res []Stat, err error) {
225238
_, err = db.Select(&res, `
226-
SELECT status, type, count(status) as "number"
239+
SELECT status, type, count(status) as "number"
227240
FROM item
228241
GROUP BY status, type`)
229242
return res, sdk.WithStack(err)
230243
}
231244

232245
func CountItemsToDelete(db gorp.SqlExecutor) (int64, error) {
233246
query := `SELECT count(1) as "number"
234-
FROM item
247+
FROM item
235248
WHERE to_delete = true`
236249
nb, err := db.SelectInt(query)
237250
return nb, sdk.WithStack(err)
@@ -334,5 +347,4 @@ func LoadRunResultByRunID(ctx context.Context, m *gorpmapper.Mapper, db gorp.Sql
334347
SELECT * FROM item WHERE id IN (SELECT id FROM deduplication)
335348
`).Args(runID, sdk.CDNTypeItemRunResult)
336349
return getItems(ctx, m, db, query)
337-
338350
}

engine/cdn/item_handler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ func (s *Service) getWorkerCache(ctx context.Context, r *http.Request, w http.Re
265265
if projectKey == "" || cachetag == "" {
266266
return sdk.WrapError(sdk.ErrWrongRequest, "invalid data to get worker cache")
267267
}
268-
item, err := item.LoadFileByProjectAndCacheTag(ctx, s.Mapper, s.mustDBWithCtx(ctx), sdk.CDNTypeItemWorkerCache, projectKey, cachetag)
268+
item, err := item.LoadWorkerCacheItemByProjectAndCacheTag(ctx, s.Mapper, s.mustDBWithCtx(ctx), projectKey, cachetag)
269269
if err != nil {
270270
return err
271271
}

0 commit comments

Comments
 (0)