Skip to content

Commit 3348ec6

Browse files
authored
fix(cdn): add deduplication by item type (#5804)
1 parent 06b65d2 commit 3348ec6

File tree

6 files changed

+211
-7
lines changed

6 files changed

+211
-7
lines changed

engine/cdn/storage/dao.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,10 @@ func LoadItemUnitsByUnit(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlE
167167
return getAllItemUnits(ctx, m, db, query, opts...)
168168
}
169169

170-
func HasItemUnitsByUnitAndHashLocator(db gorp.SqlExecutor, unitID string, hashLocator string) (bool, error) {
171-
query := "SELECT id FROM storage_unit_item WHERE unit_id = $1 AND hash_locator = $2 AND to_delete = false LIMIT 1"
170+
func HasItemUnitsByUnitAndHashLocator(db gorp.SqlExecutor, unitID string, hashLocator string, itemType sdk.CDNItemType) (bool, error) {
171+
query := "SELECT id FROM storage_unit_item WHERE unit_id = $1 AND hash_locator = $2 AND type = $3 AND to_delete = false LIMIT 1"
172172
var ids []string
173-
_, err := db.Select(&ids, query, unitID, hashLocator)
173+
_, err := db.Select(&ids, query, unitID, hashLocator, itemType)
174174
return len(ids) > 0, sdk.WithStack(err)
175175
}
176176

engine/cdn/storage/storage_unit_io.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ func (r RunningStorageUnits) NewSource(ctx context.Context, refItemUnit sdk.CDNI
123123
return &iuSource{iu: refItemUnit, source: unit}, nil
124124
}
125125

126-
func (r RunningStorageUnits) GetItemUnitByLocatorByUnit(locator string, unitID string) (bool, error) {
126+
func (r RunningStorageUnits) GetItemUnitByLocatorByUnit(locator string, unitID string, itemType sdk.CDNItemType) (bool, error) {
127127
// Load all the itemUnit for the unit and the same hashLocator
128128
hashLocator := r.HashLocator(locator)
129-
return HasItemUnitsByUnitAndHashLocator(r.db, unitID, hashLocator)
129+
return HasItemUnitsByUnitAndHashLocator(r.db, unitID, hashLocator, itemType)
130130
}

engine/cdn/storage/storageunit_purge.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func (x *RunningStorageUnits) Purge(ctx context.Context, s Interface) error {
4343
var hasItemUnit bool
4444
if _, hasLocator := s.(StorageUnitWithLocator); hasLocator {
4545
var err error
46-
hasItemUnit, err = x.GetItemUnitByLocatorByUnit(ui.Locator, s.ID())
46+
hasItemUnit, err = x.GetItemUnitByLocatorByUnit(ui.Locator, s.ID(), ui.Type)
4747
if err != nil {
4848
return err
4949
}

engine/cdn/storage/storageunit_run.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func (x *RunningStorageUnits) runItem(ctx context.Context, tx gorpmapper.SqlExec
120120
}
121121

122122
// Check if the content (based on the locator) is already known from the destination unit
123-
has, err := x.GetItemUnitByLocatorByUnit(iu.Locator, dest.ID())
123+
has, err := x.GetItemUnitByLocatorByUnit(iu.Locator, dest.ID(), iu.Type)
124124
if err != nil {
125125
return err
126126
}

engine/cdn/storage/storageunit_test.go

+201
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
package storage_test
22

33
import (
4+
"bufio"
45
"bytes"
56
"context"
7+
"crypto/md5"
8+
"crypto/sha512"
9+
"encoding/hex"
10+
"io"
611
"io/ioutil"
12+
"os"
713
"testing"
814
"time"
915

@@ -15,12 +21,207 @@ import (
1521
"github.com/ovh/cds/engine/cdn/storage"
1622
_ "github.com/ovh/cds/engine/cdn/storage/local"
1723
_ "github.com/ovh/cds/engine/cdn/storage/redis"
24+
_ "github.com/ovh/cds/engine/cdn/storage/swift"
1825
cdntest "github.com/ovh/cds/engine/cdn/test"
1926
"github.com/ovh/cds/engine/gorpmapper"
2027
commontest "github.com/ovh/cds/engine/test"
2128
"github.com/ovh/cds/sdk"
2229
)
2330

31+
func TestDeduplicationCrossType(t *testing.T) {
32+
m := gorpmapper.New()
33+
item.InitDBMapping(m)
34+
storage.InitDBMapping(m)
35+
36+
db, cache := commontest.SetupPGWithMapper(t, m, sdk.TypeCDN)
37+
cfg := commontest.LoadTestingConf(t, sdk.TypeCDN)
38+
39+
cdntest.ClearItem(t, context.TODO(), m, db)
40+
cdntest.ClearSyncRedisSet(t, cache, "local_storage")
41+
42+
ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
43+
t.Cleanup(cancel)
44+
45+
bufferDir, err := ioutil.TempDir("", t.Name()+"-cdnbuffer-1-*")
46+
require.NoError(t, err)
47+
tmpDir, err := ioutil.TempDir("", t.Name()+"-cdn-1-*")
48+
require.NoError(t, err)
49+
50+
cdnUnits, err := storage.Init(ctx, m, cache, db.DbMap, sdk.NewGoRoutines(), storage.Configuration{
51+
SyncSeconds: 10,
52+
SyncNbElements: 100,
53+
HashLocatorSalt: "thisismysalt",
54+
Buffers: map[string]storage.BufferConfiguration{
55+
"redis_buffer": {
56+
Redis: &storage.RedisBufferConfiguration{
57+
Host: cfg["redisHost"],
58+
Password: cfg["redisPassword"],
59+
},
60+
BufferType: storage.CDNBufferTypeLog,
61+
},
62+
"fs_buffer": {
63+
Local: &storage.LocalBufferConfiguration{
64+
Path: bufferDir,
65+
},
66+
BufferType: storage.CDNBufferTypeFile,
67+
},
68+
},
69+
Storages: map[string]storage.StorageConfiguration{
70+
"local_storage": {
71+
Local: &storage.LocalStorageConfiguration{
72+
Path: tmpDir,
73+
Encryption: []convergent.ConvergentEncryptionConfig{
74+
{
75+
Cipher: aesgcm.CipherName,
76+
LocatorSalt: "secret_locator_salt",
77+
SecretValue: "secret_value",
78+
},
79+
},
80+
},
81+
},
82+
},
83+
})
84+
require.NoError(t, err)
85+
require.NotNil(t, cdnUnits)
86+
cdnUnits.Start(ctx, sdk.NewGoRoutines())
87+
88+
units, err := storage.LoadAllUnits(ctx, m, db.DbMap)
89+
require.NoError(t, err)
90+
require.NotNil(t, units)
91+
require.NotEmpty(t, units)
92+
93+
// Create Item Empty Log
94+
apiRef := &sdk.CDNLogAPIRef{
95+
ProjectKey: sdk.RandomString(5),
96+
}
97+
apiRefHash, err := apiRef.ToHash()
98+
require.NoError(t, err)
99+
100+
i := &sdk.CDNItem{
101+
APIRef: apiRef,
102+
APIRefHash: apiRefHash,
103+
Created: time.Now(),
104+
Type: sdk.CDNTypeItemStepLog,
105+
Status: sdk.CDNStatusItemIncoming,
106+
}
107+
require.NoError(t, item.Insert(ctx, m, db, i))
108+
defer func() {
109+
_ = item.DeleteByID(db, i.ID)
110+
}()
111+
112+
itemUnit, err := cdnUnits.NewItemUnit(ctx, cdnUnits.LogsBuffer(), i)
113+
require.NoError(t, err)
114+
require.NoError(t, storage.InsertItemUnit(ctx, m, db, itemUnit))
115+
itemUnit, err = storage.LoadItemUnitByID(ctx, m, db, itemUnit.ID, gorpmapper.GetOptions.WithDecryption)
116+
require.NoError(t, err)
117+
reader, err := cdnUnits.LogsBuffer().NewReader(context.TODO(), *itemUnit)
118+
require.NoError(t, err)
119+
h, err := convergent.NewHash(reader)
120+
require.NoError(t, err)
121+
i.Hash = h
122+
i.Status = sdk.CDNStatusItemCompleted
123+
require.NoError(t, item.Update(ctx, m, db, i))
124+
require.NoError(t, err)
125+
i, err = item.LoadByID(ctx, m, db, i.ID, gorpmapper.GetOptions.WithDecryption)
126+
require.NoError(t, err)
127+
128+
localUnit, err := storage.LoadUnitByName(ctx, m, db, "local_storage")
129+
require.NoError(t, err)
130+
131+
localUnitDriver := cdnUnits.Storage(localUnit.Name)
132+
require.NotNil(t, localUnitDriver)
133+
134+
// Sync item in backend
135+
require.NoError(t, cdnUnits.FillWithUnknownItems(ctx, cdnUnits.Storages[0], 100))
136+
require.NoError(t, cdnUnits.FillSyncItemChannel(ctx, cdnUnits.Storages[0], 100))
137+
time.Sleep(1 * time.Second)
138+
139+
<-ctx.Done()
140+
141+
// Check that the first unit has been resync
142+
exists, err := localUnitDriver.ItemExists(context.TODO(), m, db, *i)
143+
require.NoError(t, err)
144+
require.True(t, exists)
145+
146+
logItemUnit, err := storage.LoadItemUnitByUnit(ctx, m, db, localUnitDriver.ID(), i.ID, gorpmapper.GetOptions.WithDecryption)
147+
require.NoError(t, err)
148+
149+
// Add empty artifact
150+
// Create Item Empty Log
151+
apiRefArtifact := &sdk.CDNRunResultAPIRef{
152+
ProjectKey: sdk.RandomString(5),
153+
ArtifactName: "myfile.txt",
154+
Perm: 0777,
155+
}
156+
apiRefHashArtifact, err := apiRefArtifact.ToHash()
157+
require.NoError(t, err)
158+
159+
itemArtifact := &sdk.CDNItem{
160+
APIRef: apiRefArtifact,
161+
APIRefHash: apiRefHashArtifact,
162+
Created: time.Now(),
163+
Type: sdk.CDNTypeItemRunResult,
164+
Status: sdk.CDNStatusItemCompleted,
165+
}
166+
iuArtifact, err := cdnUnits.NewItemUnit(ctx, cdnUnits.Buffers[1], itemArtifact)
167+
require.NoError(t, err)
168+
169+
// Create Destination Writer
170+
writer, err := cdnUnits.FileBuffer().NewWriter(ctx, *iuArtifact)
171+
require.NoError(t, err)
172+
173+
// Compute md5 and sha512
174+
artifactContent := make([]byte, 0)
175+
artifactReader := bytes.NewReader(artifactContent)
176+
md5Hash := md5.New()
177+
sha512Hash := sha512.New()
178+
pagesize := os.Getpagesize()
179+
mreader := bufio.NewReaderSize(artifactReader, pagesize)
180+
multiWriter := io.MultiWriter(md5Hash, sha512Hash)
181+
teeReader := io.TeeReader(mreader, multiWriter)
182+
183+
require.NoError(t, cdnUnits.FileBuffer().Write(*iuArtifact, teeReader, writer))
184+
sha512S := hex.EncodeToString(sha512Hash.Sum(nil))
185+
md5S := hex.EncodeToString(md5Hash.Sum(nil))
186+
187+
itemArtifact.Hash = sha512S
188+
itemArtifact.MD5 = md5S
189+
itemArtifact.Size = 0
190+
itemArtifact.Status = sdk.CDNStatusItemCompleted
191+
iuArtifact, err = cdnUnits.NewItemUnit(ctx, cdnUnits.Buffers[1], itemArtifact)
192+
require.NoError(t, err)
193+
require.NoError(t, item.Insert(ctx, m, db, itemArtifact))
194+
defer func() {
195+
_ = item.DeleteByID(db, itemArtifact.ID)
196+
}()
197+
// Insert Item Unit
198+
iuArtifact.ItemID = iuArtifact.Item.ID
199+
require.NoError(t, storage.InsertItemUnit(ctx, m, db, iuArtifact))
200+
201+
// Check if the content (based on the locator) is already known from the destination unit
202+
has, err := cdnUnits.GetItemUnitByLocatorByUnit(logItemUnit.Locator, cdnUnits.Storages[0].ID(), iuArtifact.Type)
203+
require.NoError(t, err)
204+
require.False(t, has)
205+
206+
require.NoError(t, cdnUnits.FillWithUnknownItems(ctx, cdnUnits.Storages[0], 100))
207+
require.NoError(t, cdnUnits.FillSyncItemChannel(ctx, cdnUnits.Storages[0], 100))
208+
time.Sleep(1 * time.Second)
209+
210+
<-ctx.Done()
211+
212+
// Check that the first unit has been resync
213+
exists, err = localUnitDriver.ItemExists(context.TODO(), m, db, *itemArtifact)
214+
require.NoError(t, err)
215+
require.True(t, exists)
216+
217+
artItemUnit, err := storage.LoadItemUnitByUnit(ctx, m, db, localUnitDriver.ID(), itemArtifact.ID, gorpmapper.GetOptions.WithDecryption)
218+
require.NoError(t, err)
219+
220+
require.Equal(t, logItemUnit.HashLocator, artItemUnit.HashLocator)
221+
require.NotEqual(t, logItemUnit.Type, artItemUnit.Type)
222+
223+
}
224+
24225
func TestRun(t *testing.T) {
25226
m := gorpmapper.New()
26227
item.InitDBMapping(m)

ui/src/app/views/workflow/run/node/artifact/artifact.list.component.ts

+3
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ export class WorkflowRunArtifactListComponent implements OnInit, OnDestroy {
104104
}
105105

106106
getHumainFileSize(size: number): string {
107+
if (size === 0) {
108+
return '0B';
109+
}
107110
let i = Math.floor(Math.log(size) / Math.log(1024));
108111
let hSize = (size / Math.pow(1024, i)).toFixed(2);
109112
return hSize + ' ' + ['B', 'kB', 'MB', 'GB', 'TB'][i];

0 commit comments

Comments
 (0)