Skip to content

Commit

Permalink
feat(backend): Remove PipelineSpec Template storage from ObjStore res…
Browse files Browse the repository at this point in the history
…ponsibilies. Fixes #10509 (#10790)

* feat(backend): Remove PipelineSpec Template storage from ObjStore responsibilies. Fixes #10509

Signed-off-by: Giulio Frasca <[email protected]>

* chore: Remove BadObjStore unit tests (no longer applicable)

Signed-off-by: Giulio Frasca <[email protected]>

* test: Update backend unit tests to not retrieve PipelineSpec from mock ObjStore

- Add PipelineSpec to mock PVs as they are no longer retrieved from
  ObjStore

Signed-off-by: Giulio Frasca <[email protected]>

---------

Signed-off-by: Giulio Frasca <[email protected]>
  • Loading branch information
gmfrasca authored Oct 3, 2024
1 parent f4cdbeb commit 374b18b
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 132 deletions.
52 changes: 2 additions & 50 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,13 +405,6 @@ func (r *ResourceManager) CreatePipelineAndPipelineVersion(p *model.Pipeline, pv
return nil, nil, util.Wrap(err, "Failed to create a pipeline and a pipeline version")
}

// TODO(gkcalat): consider removing this after v2beta1 GA if we adopt storing PipelineSpec in DB.
// Store the pipeline file
err = r.objectStore.AddFile(tmpl.Bytes(), r.objectStore.GetPipelineKey(newVersion.UUID))
if err != nil {
return nil, nil, util.Wrap(err, "Failed to create a pipeline and a pipeline version due to error saving PipelineSpec to ObjectStore")
}

newPipeline.Status = model.PipelineReady
err = r.pipelineStore.UpdatePipelineStatus(
newPipeline.UUID,
Expand Down Expand Up @@ -1546,13 +1539,6 @@ func (r *ResourceManager) CreatePipelineVersion(pv *model.PipelineVersion) (*mod
return nil, util.Wrap(err, "Failed to create pipeline version in PipelineStore")
}

// TODO(gkcalat): consider removing this after v2beta1 GA if we adopt storing PipelineSpec in DB.
// Store the pipeline file
err = r.objectStore.AddFile(tmpl.Bytes(), r.objectStore.GetPipelineKey(fmt.Sprint(version.UUID)))
if err != nil {
return nil, util.Wrap(err, "Failed to create a pipeline version due to error saving PipelineSpec to ObjectStore")
}

// After pipeline version being created in DB and pipeline file being
// saved in minio server, set this pieline version to status ready.
version.Status = model.PipelineVersionReady
Expand Down Expand Up @@ -1600,7 +1586,7 @@ func (r *ResourceManager) ListPipelineVersions(pipelineId string, opts *list.Opt
// Deletes a pipeline version and the corresponding PipelineSpec.
func (r *ResourceManager) DeletePipelineVersion(pipelineVersionId string) error {
// Check if pipeline version exists
pipelineVersion, err := r.pipelineStore.GetPipelineVersion(pipelineVersionId)
_, err := r.pipelineStore.GetPipelineVersion(pipelineVersionId)
if err != nil {
return util.Wrapf(err, "Failed to delete pipeline version with id %v as it was not found", pipelineVersionId)
}
Expand All @@ -1621,41 +1607,7 @@ func (r *ResourceManager) DeletePipelineVersion(pipelineVersionId string) error
// either using async deletion in order for this method to be non-blocking
// or or exploring other performance optimization tools provided by gcs.
//
// TODO(gkcalat): consider removing this if we switch to storing PipelineSpec in DB.
// DeleteObject always responds with http '204' even for
// objects which do not exist. The err below will be nil.
//
// Delete based on pipeline spec URI
pipelineSpecRemoved := false
var osErr error
err = r.objectStore.DeleteFile(pipelineVersion.PipelineSpecURI)
if err != nil {
glog.Errorf("%v", util.Wrapf(err, "Failed to delete pipeline spec for pipeline version id %v with URI %v", pipelineVersionId, pipelineVersion.PipelineSpecURI))
osErr = util.Wrapf(err, "Failed to delete pipeline spec for pipeline version id %v with URI %v", pipelineVersionId, pipelineVersion.PipelineSpecURI)
} else {
pipelineSpecRemoved = true
}
// Delete based on pipeline version id
err = r.objectStore.DeleteFile(r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersionId)))
if err != nil {
glog.Errorf("%v", util.Wrapf(err, "Failed to delete pipeline spec for pipeline version id %v", pipelineVersionId))
err = util.Wrapf(err, "Failed to delete pipeline spec for pipeline version id %v", pipelineVersionId)
osErr = util.Wrap(osErr, err.Error())
} else {
pipelineSpecRemoved = true
}
// Delete based on pipeline id
err = r.objectStore.DeleteFile(r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.PipelineId)))
if err != nil {
glog.Errorf("%v", util.Wrapf(err, "Failed to delete pipeline spec for pipeline version id %v using pipeline id %v", pipelineVersionId, pipelineVersion.PipelineId))
err = util.Wrapf(err, "Failed to delete pipeline spec for pipeline version id %v using pipeline id %v", pipelineVersionId, pipelineVersion.PipelineId)
osErr = util.Wrap(osErr, err.Error())
} else {
pipelineSpecRemoved = true
}
if !pipelineSpecRemoved {
return util.Wrap(osErr, "Failed to delete a pipeline spec")
}

// Delete the DB entry
err = r.pipelineStore.DeletePipelineVersion(pipelineVersionId)
if err != nil {
Expand Down
41 changes: 3 additions & 38 deletions backend/src/apiserver/resource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,24 +388,6 @@ func TestCreatePipeline(t *testing.T) {
name: "complex",
model: createPipeline("complex", "", "user1"),
},
{
msg: "BadObjectStore",
badObjectStore: true,
template: testWorkflow.ToStringForStore(),
errorCode: codes.Internal,
errorMsg: "bad object store",
model: createPipeline("BadOS", "", "user1"),
// We previously verified that the failed pipeline version
// in DB is in status PipelineVersionCreating by faking
// the UUID generator, so that we know the created version
// UUID in advance.
// We cannot verify it using public APIs,
// because the API does not expose them unless we know its UUID, but we
// cannot know its UUID when create version request failed.
// TODO: do we really need to verify this status? or should
// the create version request return a UUID when the
// pipeline version fails in PipelineVersionCreating state.
},
{
msg: "InvalidTemplate",
template: "I am invalid yaml",
Expand Down Expand Up @@ -536,23 +518,6 @@ func TestCreatePipelineVersion(t *testing.T) {
PipelineSpec: complexPipeline,
},
},
{
msg: "BadObjectStore",
badObjectStore: true,
template: testWorkflow.ToStringForStore(),
errorCode: codes.Internal,
errorMsg: "bad object store",
// We previously verified that the failed pipeline version
// in DB is in status PipelineVersionCreating by faking
// the UUID generator, so that we know the created version
// UUID in advance.
// We cannot verify it using public APIs,
// because the API does not expose them unless we know its UUID, but we
// cannot know its UUID when create version request failed.
// TODO: do we really need to verify this status? or should
// the create version request return a UUID when the
// pipeline version fails in PipelineVersionCreating state.
},
{
msg: "InvalidTemplate",
template: "I am invalid yaml",
Expand Down Expand Up @@ -1530,12 +1495,12 @@ func TestDeletePipelineVersion_FileError(t *testing.T) {

// Delete the above pipeline_version.
err = manager.DeletePipelineVersion(FakeUUIDOne)
assert.NotNil(t, err)
assert.Nil(t, err)

// Verify the version in deleting status.
version, err := manager.pipelineStore.GetPipelineVersionWithStatus(FakeUUIDOne, model.PipelineVersionDeleting)
assert.Nil(t, err)
assert.NotNil(t, version)
assert.NotNil(t, err)
assert.Nil(t, version)
}

// Tests DeletePipeline
Expand Down
10 changes: 6 additions & 4 deletions backend/src/apiserver/server/fakes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,9 @@ func initWithExperimentsAndTwoPipelineVersions(t *testing.T) *resource.FakeClien
resourceManager = resource.NewResourceManager(clientManager, &resource.ResourceManagerOptions{CollectMetrics: false})
_, err = resourceManager.CreatePipelineVersion(
&model.PipelineVersion{
Name: "pipeline_version",
PipelineId: DefaultFakeUUID,
Name: "pipeline_version",
PipelineId: DefaultFakeUUID,
PipelineSpec: "apiVersion: argoproj.io/v1alpha1\nkind: Workflow",
},
)
assert.Nil(t, err)
Expand Down Expand Up @@ -302,8 +303,9 @@ func initWithExperimentsAndTwoPipelineVersions(t *testing.T) *resource.FakeClien
resourceManager = resource.NewResourceManager(clientManager, &resource.ResourceManagerOptions{CollectMetrics: false})
_, err = resourceManager.CreatePipelineVersion(
&model.PipelineVersion{
Name: "another_pipeline_version",
PipelineId: NonDefaultFakeUUID,
Name: "another_pipeline_version",
PipelineId: NonDefaultFakeUUID,
PipelineSpec: "apiVersion: argoproj.io/v1alpha1\nkind: Workflow",
},
)
assert.Nil(t, err)
Expand Down
41 changes: 1 addition & 40 deletions backend/src/apiserver/server/pipeline_upload_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,6 @@ func TestUploadPipeline(t *testing.T) {
assert.Equal(t, "123e4567-e89b-12d3-a456-426655440000", parsedResponse.PipelineID)
}

// Verify stored in object store
objStore := clientManager.ObjectStore()
template, err := objStore.GetFile(objStore.GetPipelineKey(DefaultFakeUUID))
assert.Nil(t, err)
assert.NotNil(t, template)

opts, err := list.NewOptions(&model.Pipeline{}, 2, "", nil)
assert.Nil(t, err)

Expand Down Expand Up @@ -159,11 +153,6 @@ func TestUploadPipeline(t *testing.T) {
assert.Equal(t, 200, response.Code)
assert.Contains(t, response.Body.String(), `"created_at":"1970-01-01T00:00:03Z"`)

// Verify stored in object store
objStore = clientManager.ObjectStore()
template, err = objStore.GetFile(objStore.GetPipelineKey(fakeVersionUUID))
assert.Nil(t, err)
assert.NotNil(t, template)
opts, err = list.NewOptions(&model.PipelineVersion{}, 2, "", nil)
assert.Nil(t, err)

Expand Down Expand Up @@ -256,7 +245,7 @@ func TestUploadPipelineV2_NameValidation(t *testing.T) {
}
for _, test := range tt {
t.Run(test.name, func(t *testing.T) {
clientManager, server := setupClientManagerAndServer()
_, server := setupClientManagerAndServer()
bytesBuffer, writer := setupWriter("")
setWriterWithBuffer("uploadfile", "hello-world.yaml", string(test.spec), writer)
response := uploadPipeline("/apis/v2beta1/pipelines/upload",
Expand Down Expand Up @@ -284,12 +273,6 @@ func TestUploadPipelineV2_NameValidation(t *testing.T) {
// Verify v1 API returns v1 object while v2 API returns v2 object.
assert.Equal(t, "", parsedResponse.ID)
assert.Equal(t, "123e4567-e89b-12d3-a456-426655440000", parsedResponse.PipelineID)

// Verify stored in object store
objStore := clientManager.ObjectStore()
template, err := objStore.GetFile(objStore.GetPipelineKey(DefaultFakeUUID))
assert.Nil(t, err)
assert.NotNil(t, template)
}
})
}
Expand All @@ -306,12 +289,6 @@ func TestUploadPipeline_Tarball(t *testing.T) {
// Verify time format is RFC3339
assert.Contains(t, response.Body.String(), `"created_at":"1970-01-01T00:00:01Z"`)

// Verify stored in object store
objStore := clientManager.ObjectStore()
template, err := objStore.GetFile(objStore.GetPipelineKey(DefaultFakeUUID))
assert.Nil(t, err)
assert.NotNil(t, template)

opts, err := list.NewOptions(&model.Pipeline{}, 2, "", nil)
assert.Nil(t, err)
// Verify metadata in db
Expand Down Expand Up @@ -359,11 +336,6 @@ func TestUploadPipeline_Tarball(t *testing.T) {
assert.Equal(t, 200, response.Code)
assert.Contains(t, response.Body.String(), `"created_at":"1970-01-01T00:00:03Z"`)

// Verify stored in object store
objStore = clientManager.ObjectStore()
template, err = objStore.GetFile(objStore.GetPipelineKey(fakeVersionUUID))
assert.Nil(t, err)
assert.NotNil(t, template)
opts, err = list.NewOptions(&model.PipelineVersion{}, 2, "", nil)
assert.Nil(t, err)
// Verify metadata in db
Expand Down Expand Up @@ -414,12 +386,6 @@ func TestUploadPipeline_SpecifyFileName(t *testing.T) {
bytes.NewReader(bytesBuffer.Bytes()), writer, server.UploadPipeline)
assert.Equal(t, 200, response.Code)

// Verify stored in object store
objStore := clientManager.ObjectStore()
template, err := objStore.GetFile(objStore.GetPipelineKey(DefaultFakeUUID))
assert.Nil(t, err)
assert.NotNil(t, template)

opts, err := list.NewOptions(&model.Pipeline{}, 2, "", nil)
assert.Nil(t, err)
// Verify metadata in db
Expand Down Expand Up @@ -467,11 +433,6 @@ func TestUploadPipeline_SpecifyFileDescription(t *testing.T) {
bytes.NewReader(bytesBuffer.Bytes()), writer, server.UploadPipeline)
assert.Equal(t, 200, response.Code)

// Verify stored in object store
objStore := clientManager.ObjectStore()
template, err := objStore.GetFile(objStore.GetPipelineKey(DefaultFakeUUID))
assert.Nil(t, err)
assert.NotNil(t, template)
opts, err := list.NewOptions(&model.Pipeline{}, 2, "", nil)
assert.Nil(t, err)

Expand Down

0 comments on commit 374b18b

Please sign in to comment.