diff --git a/pkg/datastore/apikey.go b/pkg/datastore/apikey.go index 79af24f0c2..42ee294afb 100644 --- a/pkg/datastore/apikey.go +++ b/pkg/datastore/apikey.go @@ -16,6 +16,7 @@ package datastore import ( "context" + "encoding/json" "fmt" "time" @@ -51,6 +52,23 @@ func (a *apiKeyCollection) GetUpdatableShard() (Shard, error) { } } +func (a *apiKeyCollection) Encode(e interface{}) (map[Shard][]byte, error) { + const errFmt = "failed while encode APIKey object: %s" + + me, ok := e.(*model.APIKey) + if !ok { + return nil, fmt.Errorf("type not matched") + } + + data, err := json.Marshal(me) + if err != nil { + return nil, fmt.Errorf(errFmt, "unable to marshal entity data") + } + return map[Shard][]byte{ + ClientShard: data, + }, nil +} + type APIKeyStore interface { Add(ctx context.Context, k *model.APIKey) error Get(ctx context.Context, id string) (*model.APIKey, error) diff --git a/pkg/datastore/applicationstore.go b/pkg/datastore/applicationstore.go index 13bbcb7a51..bd63324483 100644 --- a/pkg/datastore/applicationstore.go +++ b/pkg/datastore/applicationstore.go @@ -16,6 +16,7 @@ package datastore import ( "context" + "encoding/json" "fmt" "time" @@ -54,6 +55,59 @@ func (a *applicationCollection) GetUpdatableShard() (Shard, error) { } } +func (a *applicationCollection) Encode(e interface{}) (map[Shard][]byte, error) { + const errFmt = "failed while encode Application object: %s" + + me, ok := e.(*model.Application) + if !ok { + return nil, fmt.Errorf(errFmt, "type not matched") + } + + // TODO: Find a way to generate function to build this kind of object by specifying a field tag in the proto file. + // For example: + // ```proto + // message Application { + // // The generated unique identifier. + // string id = 1 [(validate.rules).string.min_len = 1, shard=client]; + // ``` + clientShardStruct := model.Application{ + Id: me.Id, + ProjectId: me.ProjectId, + Kind: me.Kind, + GitPath: me.GitPath, + CloudProvider: me.CloudProvider, + Disabled: me.Disabled, + Deleted: me.Deleted, + DeletedAt: me.DeletedAt, + CreatedAt: me.CreatedAt, + UpdatedAt: me.UpdatedAt, + } + cdata, err := json.Marshal(&clientShardStruct) + if err != nil { + return nil, fmt.Errorf(errFmt, "unable to marshal entity data") + } + + agentShardStruct := model.Application{ + Name: me.Name, + Description: me.Description, + Labels: me.Labels, + SyncState: me.SyncState, + Deploying: me.Deploying, + MostRecentlySuccessfulDeployment: me.MostRecentlySuccessfulDeployment, + MostRecentlyTriggeredDeployment: me.MostRecentlyTriggeredDeployment, + UpdatedAt: me.UpdatedAt, + } + adata, err := json.Marshal(&agentShardStruct) + if err != nil { + return nil, fmt.Errorf(errFmt, "unable to marshal entity data") + } + + return map[Shard][]byte{ + ClientShard: cdata, + AgentShard: adata, + }, nil +} + type ApplicationStore interface { Add(ctx context.Context, app *model.Application) error Get(ctx context.Context, id string) (*model.Application, error) diff --git a/pkg/datastore/commandstore.go b/pkg/datastore/commandstore.go index 315faff977..ccfc82afcb 100644 --- a/pkg/datastore/commandstore.go +++ b/pkg/datastore/commandstore.go @@ -86,6 +86,35 @@ func (c *commandCollection) Decode(e interface{}, parts ...[]byte) error { return nil } +func (c *commandCollection) Encode(e interface{}) (map[Shard][]byte, error) { + const errFmt = "failed while encode Command object: %s" + + me, ok := e.(*model.Command) + if !ok { + return nil, fmt.Errorf(errFmt, "type not matched") + } + + agentShardStruct := me + adata, err := json.Marshal(&agentShardStruct) + if err != nil { + return nil, fmt.Errorf(errFmt, "unable to marshal entity data") + } + + opsShardStruct := model.Command{ + Status: me.Status, + UpdatedAt: me.UpdatedAt, + } + odata, err := json.Marshal(&opsShardStruct) + if err != nil { + return nil, fmt.Errorf(errFmt, "unable to marshal entity data") + } + + return map[Shard][]byte{ + AgentShard: adata, + OpsShard: odata, + }, nil +} + type CommandStore interface { Add(ctx context.Context, cmd *model.Command) error Get(ctx context.Context, id string) (*model.Command, error) diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index df578aac70..16ec32e218 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -97,6 +97,12 @@ type ShardStorable interface { GetUpdatableShard() (Shard, error) } +type ShardEncoder interface { + // Encode accepts an object as its input and returns a map which key is the shard and + // value is the raw data which should be stored under the key shard. + Encode(e interface{}) (map[Shard][]byte, error) +} + type ShardDecoder interface { // Decode unmarshals all given raw data parts to a given entity e. Decode(e interface{}, parts ...[]byte) error diff --git a/pkg/datastore/deploymentchainstore.go b/pkg/datastore/deploymentchainstore.go index 0ada4a0710..d86c350ebf 100644 --- a/pkg/datastore/deploymentchainstore.go +++ b/pkg/datastore/deploymentchainstore.go @@ -16,6 +16,7 @@ package datastore import ( "context" + "encoding/json" "fmt" "time" @@ -51,6 +52,23 @@ func (d *deploymentChainCollection) GetUpdatableShard() (Shard, error) { } } +func (d *deploymentChainCollection) Encode(e interface{}) (map[Shard][]byte, error) { + const errFmt = "failed while encode DeploymentChain object: %s" + + me, ok := e.(*model.DeploymentChain) + if !ok { + return nil, fmt.Errorf(errFmt, "type not matched") + } + + data, err := json.Marshal(me) + if err != nil { + return nil, fmt.Errorf(errFmt, "unable to marshal entity data") + } + return map[Shard][]byte{ + OpsShard: data, + }, nil +} + var ( addDeploymentToBlockUpdateFunc = func(deployment *model.Deployment) func(*model.DeploymentChain) error { return func(dc *model.DeploymentChain) error { diff --git a/pkg/datastore/deploymentstore.go b/pkg/datastore/deploymentstore.go index 8c03fe676f..394664341b 100644 --- a/pkg/datastore/deploymentstore.go +++ b/pkg/datastore/deploymentstore.go @@ -16,6 +16,7 @@ package datastore import ( "context" + "encoding/json" "fmt" "time" @@ -51,6 +52,23 @@ func (d *deploymentCollection) GetUpdatableShard() (Shard, error) { } } +func (d *deploymentCollection) Encode(e interface{}) (map[Shard][]byte, error) { + const errFmt = "failed while encode Deployment object: %s" + + me, ok := e.(*model.Deployment) + if !ok { + return nil, fmt.Errorf("type not matched") + } + + data, err := json.Marshal(me) + if err != nil { + return nil, fmt.Errorf(errFmt, "unable to marshal entity data") + } + return map[Shard][]byte{ + AgentShard: data, + }, nil +} + var ( toPlannedUpdateFunc = func(summary, statusReason, runningCommitHash, runningConfigFilename, version string, stages []*model.PipelineStage) func(*model.Deployment) error { return func(d *model.Deployment) error { diff --git a/pkg/datastore/eventstore.go b/pkg/datastore/eventstore.go index b13e131ee9..0b995c8518 100644 --- a/pkg/datastore/eventstore.go +++ b/pkg/datastore/eventstore.go @@ -16,6 +16,8 @@ package datastore import ( "context" + "encoding/json" + "fmt" "time" "github.com/pipe-cd/pipecd/pkg/model" @@ -50,6 +52,23 @@ func (e *eventCollection) GetUpdatableShard() (Shard, error) { } } +func (e *eventCollection) Encode(entity interface{}) (map[Shard][]byte, error) { + const errFmt = "failed while encode Event object: %s" + + me, ok := entity.(*model.Event) + if !ok { + return nil, fmt.Errorf(errFmt, "type not matched") + } + + data, err := json.Marshal(me) + if err != nil { + return nil, fmt.Errorf(errFmt, "unable to marshal entity data") + } + return map[Shard][]byte{ + AgentShard: data, + }, nil +} + type EventStore interface { Add(ctx context.Context, e model.Event) error List(ctx context.Context, opts ListOptions) ([]*model.Event, string, error) diff --git a/pkg/datastore/filedb/codec.go b/pkg/datastore/filedb/codec.go index 1f8b03d835..05f49c509b 100644 --- a/pkg/datastore/filedb/codec.go +++ b/pkg/datastore/filedb/codec.go @@ -70,3 +70,15 @@ func merge(e interface{}, parts ...[]byte) error { me.SetUpdatedAt(latest) return nil } + +// encode checks for the given collection object. If the given collection +// implements the `datastore.ShardEncoder` interface, its implementation will +// be used. If not, `datastore.ErrUnsupported` error will be raised. +func encode(col datastore.Collection, e interface{}) (map[datastore.Shard][]byte, error) { + ecol, ok := col.(datastore.ShardEncoder) + if !ok { + return nil, datastore.ErrUnsupported + } + + return ecol.Encode(e) +} diff --git a/pkg/datastore/filedb/filedb.go b/pkg/datastore/filedb/filedb.go index 6fb6fa3797..343bc6b87b 100644 --- a/pkg/datastore/filedb/filedb.go +++ b/pkg/datastore/filedb/filedb.go @@ -99,11 +99,30 @@ func (f *FileDB) Get(ctx context.Context, col datastore.Collection, id string, v } func (f *FileDB) Create(ctx context.Context, col datastore.Collection, id string, entity interface{}) error { - _, ok := col.(datastore.ShardStorable) - if !ok { - return datastore.ErrUnsupported + kind := col.Kind() + sdata, err := encode(col, entity) + if err != nil { + f.logger.Error("failed to encode entity", + zap.String("id", id), + zap.String("kind", kind), + zap.Error(err), + ) + return err } - return datastore.ErrUnimplemented + + for shard, data := range sdata { + path := makeHotStorageFilePath(kind, id, shard) + if err = f.backend.Put(ctx, path, data); err != nil { + f.logger.Error("failed to store entity", + zap.String("id", id), + zap.String("kind", kind), + zap.Error(err), + ) + return err + } + } + + return nil } func (f *FileDB) Update(ctx context.Context, col datastore.Collection, id string, updater datastore.Updater) error { diff --git a/pkg/datastore/pipedstore.go b/pkg/datastore/pipedstore.go index 33977b5b15..0eed346a9f 100644 --- a/pkg/datastore/pipedstore.go +++ b/pkg/datastore/pipedstore.go @@ -16,6 +16,8 @@ package datastore import ( "context" + "encoding/json" + "fmt" "time" "github.com/pipe-cd/pipecd/pkg/model" @@ -50,6 +52,23 @@ func (p *pipedCollection) GetUpdatableShard() (Shard, error) { } } +func (p *pipedCollection) Encode(e interface{}) (map[Shard][]byte, error) { + const errFmt = "failed while encode Piped object: %s" + + me, ok := e.(*model.Piped) + if !ok { + return nil, fmt.Errorf(errFmt, "type not matched") + } + + data, err := json.Marshal(me) + if err != nil { + return nil, fmt.Errorf(errFmt, "unable to marshal entity data") + } + return map[Shard][]byte{ + ClientShard: data, + }, nil +} + type PipedStore interface { Add(ctx context.Context, piped *model.Piped) error Get(ctx context.Context, id string) (*model.Piped, error) diff --git a/pkg/datastore/projectstore.go b/pkg/datastore/projectstore.go index 78bd8cc912..da22d39a26 100644 --- a/pkg/datastore/projectstore.go +++ b/pkg/datastore/projectstore.go @@ -16,6 +16,8 @@ package datastore import ( "context" + "encoding/json" + "fmt" "time" "github.com/pipe-cd/pipecd/pkg/model" @@ -50,6 +52,23 @@ func (p *projectCollection) GetUpdatableShard() (Shard, error) { } } +func (p *projectCollection) Encode(e interface{}) (map[Shard][]byte, error) { + const errFmt = "failed while encode Project object: %s" + + me, ok := e.(*model.Project) + if !ok { + return nil, fmt.Errorf(errFmt, "type not matched") + } + + data, err := json.Marshal(me) + if err != nil { + return nil, fmt.Errorf(errFmt, "unable to marshal entity data") + } + return map[Shard][]byte{ + ClientShard: data, + }, nil +} + type ProjectStore interface { Add(ctx context.Context, proj *model.Project) error Get(ctx context.Context, id string) (*model.Project, error)