Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions pkg/datastore/apikey.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package datastore

import (
"context"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -51,6 +52,23 @@ func (a *apiKeyCollection) GetUpdatableShard() (Shard, error) {
}
}

func (a *apiKeyCollection) Encode(e interface{}) (map[Shard][]byte, error) {
const errFmt = "failed while encode APIKey object: %s"

me, ok := e.(*model.APIKey)
if !ok {
return nil, fmt.Errorf("type not matched")
}

data, err := json.Marshal(me)
if err != nil {
return nil, fmt.Errorf(errFmt, "unable to marshal entity data")
}
return map[Shard][]byte{
ClientShard: data,
}, nil
}

type APIKeyStore interface {
Add(ctx context.Context, k *model.APIKey) error
Get(ctx context.Context, id string) (*model.APIKey, error)
Expand Down
54 changes: 54 additions & 0 deletions pkg/datastore/applicationstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package datastore

import (
"context"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -54,6 +55,59 @@ func (a *applicationCollection) GetUpdatableShard() (Shard, error) {
}
}

func (a *applicationCollection) Encode(e interface{}) (map[Shard][]byte, error) {
const errFmt = "failed while encode Application object: %s"

me, ok := e.(*model.Application)
if !ok {
return nil, fmt.Errorf(errFmt, "type not matched")
}

// TODO: Find a way to generate function to build this kind of object by specifying a field tag in the proto file.
// For example:
// ```proto
// message Application {
// // The generated unique identifier.
// string id = 1 [(validate.rules).string.min_len = 1, shard=client];
// ```
clientShardStruct := model.Application{
Id: me.Id,
ProjectId: me.ProjectId,
Kind: me.Kind,
GitPath: me.GitPath,
CloudProvider: me.CloudProvider,
Disabled: me.Disabled,
Deleted: me.Deleted,
DeletedAt: me.DeletedAt,
CreatedAt: me.CreatedAt,
UpdatedAt: me.UpdatedAt,
}
cdata, err := json.Marshal(&clientShardStruct)
if err != nil {
return nil, fmt.Errorf(errFmt, "unable to marshal entity data")
}

agentShardStruct := model.Application{
Name: me.Name,
Description: me.Description,
Labels: me.Labels,
SyncState: me.SyncState,
Deploying: me.Deploying,
MostRecentlySuccessfulDeployment: me.MostRecentlySuccessfulDeployment,
MostRecentlyTriggeredDeployment: me.MostRecentlyTriggeredDeployment,
UpdatedAt: me.UpdatedAt,
}
adata, err := json.Marshal(&agentShardStruct)
if err != nil {
return nil, fmt.Errorf(errFmt, "unable to marshal entity data")
}

return map[Shard][]byte{
ClientShard: cdata,
AgentShard: adata,
}, nil
}

type ApplicationStore interface {
Add(ctx context.Context, app *model.Application) error
Get(ctx context.Context, id string) (*model.Application, error)
Expand Down
29 changes: 29 additions & 0 deletions pkg/datastore/commandstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,35 @@ func (c *commandCollection) Decode(e interface{}, parts ...[]byte) error {
return nil
}

func (c *commandCollection) Encode(e interface{}) (map[Shard][]byte, error) {
const errFmt = "failed while encode Command object: %s"

me, ok := e.(*model.Command)
if !ok {
return nil, fmt.Errorf(errFmt, "type not matched")
}

agentShardStruct := me
adata, err := json.Marshal(&agentShardStruct)
if err != nil {
return nil, fmt.Errorf(errFmt, "unable to marshal entity data")
}

opsShardStruct := model.Command{
Status: me.Status,
UpdatedAt: me.UpdatedAt,
}
odata, err := json.Marshal(&opsShardStruct)
if err != nil {
return nil, fmt.Errorf(errFmt, "unable to marshal entity data")
}

return map[Shard][]byte{
AgentShard: adata,
OpsShard: odata,
}, nil
}

type CommandStore interface {
Add(ctx context.Context, cmd *model.Command) error
Get(ctx context.Context, id string) (*model.Command, error)
Expand Down
6 changes: 6 additions & 0 deletions pkg/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ type ShardStorable interface {
GetUpdatableShard() (Shard, error)
}

type ShardEncoder interface {
// Encode accepts an object as its input and returns a map which key is the shard and
// value is the raw data which should be stored under the key shard.
Encode(e interface{}) (map[Shard][]byte, error)
}

type ShardDecoder interface {
// Decode unmarshals all given raw data parts to a given entity e.
Decode(e interface{}, parts ...[]byte) error
Expand Down
18 changes: 18 additions & 0 deletions pkg/datastore/deploymentchainstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package datastore

import (
"context"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -51,6 +52,23 @@ func (d *deploymentChainCollection) GetUpdatableShard() (Shard, error) {
}
}

func (d *deploymentChainCollection) Encode(e interface{}) (map[Shard][]byte, error) {
const errFmt = "failed while encode DeploymentChain object: %s"

me, ok := e.(*model.DeploymentChain)
if !ok {
return nil, fmt.Errorf(errFmt, "type not matched")
}

data, err := json.Marshal(me)
if err != nil {
return nil, fmt.Errorf(errFmt, "unable to marshal entity data")
}
return map[Shard][]byte{
OpsShard: data,
}, nil
}

var (
addDeploymentToBlockUpdateFunc = func(deployment *model.Deployment) func(*model.DeploymentChain) error {
return func(dc *model.DeploymentChain) error {
Expand Down
18 changes: 18 additions & 0 deletions pkg/datastore/deploymentstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package datastore

import (
"context"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -51,6 +52,23 @@ func (d *deploymentCollection) GetUpdatableShard() (Shard, error) {
}
}

func (d *deploymentCollection) Encode(e interface{}) (map[Shard][]byte, error) {
const errFmt = "failed while encode Deployment object: %s"

me, ok := e.(*model.Deployment)
if !ok {
return nil, fmt.Errorf("type not matched")
}

data, err := json.Marshal(me)
if err != nil {
return nil, fmt.Errorf(errFmt, "unable to marshal entity data")
}
return map[Shard][]byte{
AgentShard: data,
}, nil
}

var (
toPlannedUpdateFunc = func(summary, statusReason, runningCommitHash, runningConfigFilename, version string, stages []*model.PipelineStage) func(*model.Deployment) error {
return func(d *model.Deployment) error {
Expand Down
19 changes: 19 additions & 0 deletions pkg/datastore/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package datastore

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/pipe-cd/pipecd/pkg/model"
Expand Down Expand Up @@ -50,6 +52,23 @@ func (e *eventCollection) GetUpdatableShard() (Shard, error) {
}
}

func (e *eventCollection) Encode(entity interface{}) (map[Shard][]byte, error) {
const errFmt = "failed while encode Event object: %s"

me, ok := entity.(*model.Event)
if !ok {
return nil, fmt.Errorf(errFmt, "type not matched")
}

data, err := json.Marshal(me)
if err != nil {
return nil, fmt.Errorf(errFmt, "unable to marshal entity data")
}
return map[Shard][]byte{
AgentShard: data,
}, nil
}

type EventStore interface {
Add(ctx context.Context, e model.Event) error
List(ctx context.Context, opts ListOptions) ([]*model.Event, string, error)
Expand Down
12 changes: 12 additions & 0 deletions pkg/datastore/filedb/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,15 @@ func merge(e interface{}, parts ...[]byte) error {
me.SetUpdatedAt(latest)
return nil
}

// encode checks for the given collection object. If the given collection
// implements the `datastore.ShardEncoder` interface, its implementation will
// be used. If not, `datastore.ErrUnsupported` error will be raised.
func encode(col datastore.Collection, e interface{}) (map[datastore.Shard][]byte, error) {
ecol, ok := col.(datastore.ShardEncoder)
if !ok {
return nil, datastore.ErrUnsupported
}

return ecol.Encode(e)
}
27 changes: 23 additions & 4 deletions pkg/datastore/filedb/filedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,30 @@ func (f *FileDB) Get(ctx context.Context, col datastore.Collection, id string, v
}

func (f *FileDB) Create(ctx context.Context, col datastore.Collection, id string, entity interface{}) error {
_, ok := col.(datastore.ShardStorable)
if !ok {
return datastore.ErrUnsupported
kind := col.Kind()
sdata, err := encode(col, entity)
if err != nil {
f.logger.Error("failed to encode entity",
zap.String("id", id),
zap.String("kind", kind),
zap.Error(err),
)
return err
}
return datastore.ErrUnimplemented

for shard, data := range sdata {
path := makeHotStorageFilePath(kind, id, shard)
if err = f.backend.Put(ctx, path, data); err != nil {
f.logger.Error("failed to store entity",
zap.String("id", id),
zap.String("kind", kind),
zap.Error(err),
)
return err
}
}

return nil
}

func (f *FileDB) Update(ctx context.Context, col datastore.Collection, id string, updater datastore.Updater) error {
Expand Down
19 changes: 19 additions & 0 deletions pkg/datastore/pipedstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package datastore

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/pipe-cd/pipecd/pkg/model"
Expand Down Expand Up @@ -50,6 +52,23 @@ func (p *pipedCollection) GetUpdatableShard() (Shard, error) {
}
}

func (p *pipedCollection) Encode(e interface{}) (map[Shard][]byte, error) {
const errFmt = "failed while encode Piped object: %s"

me, ok := e.(*model.Piped)
if !ok {
return nil, fmt.Errorf(errFmt, "type not matched")
}

data, err := json.Marshal(me)
if err != nil {
return nil, fmt.Errorf(errFmt, "unable to marshal entity data")
}
return map[Shard][]byte{
ClientShard: data,
}, nil
}

type PipedStore interface {
Add(ctx context.Context, piped *model.Piped) error
Get(ctx context.Context, id string) (*model.Piped, error)
Expand Down
19 changes: 19 additions & 0 deletions pkg/datastore/projectstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package datastore

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/pipe-cd/pipecd/pkg/model"
Expand Down Expand Up @@ -50,6 +52,23 @@ func (p *projectCollection) GetUpdatableShard() (Shard, error) {
}
}

func (p *projectCollection) Encode(e interface{}) (map[Shard][]byte, error) {
const errFmt = "failed while encode Project object: %s"

me, ok := e.(*model.Project)
if !ok {
return nil, fmt.Errorf(errFmt, "type not matched")
}

data, err := json.Marshal(me)
if err != nil {
return nil, fmt.Errorf(errFmt, "unable to marshal entity data")
}
return map[Shard][]byte{
ClientShard: data,
}, nil
}

type ProjectStore interface {
Add(ctx context.Context, proj *model.Project) error
Get(ctx context.Context, id string) (*model.Project, error)
Expand Down