diff --git a/pkg/datastore/filedb/BUILD.bazel b/pkg/datastore/filedb/BUILD.bazel index 1f5046629f..30d573309e 100644 --- a/pkg/datastore/filedb/BUILD.bazel +++ b/pkg/datastore/filedb/BUILD.bazel @@ -5,11 +5,15 @@ go_library( srcs = [ "codec.go", "filedb.go", + "filter.go", + "iterator.go", ], importpath = "github.com/pipe-cd/pipecd/pkg/datastore/filedb", visibility = ["//visibility:public"], deps = [ + "//pkg/cache:go_default_library", "//pkg/datastore:go_default_library", + "//pkg/datastore/filedb/objectcache:go_default_library", "//pkg/filestore:go_default_library", "@org_uber_go_zap//:go_default_library", ], diff --git a/pkg/datastore/filedb/filedb.go b/pkg/datastore/filedb/filedb.go index 343bc6b87b..dde3aa5029 100644 --- a/pkg/datastore/filedb/filedb.go +++ b/pkg/datastore/filedb/filedb.go @@ -17,16 +17,20 @@ package filedb import ( "context" "fmt" + "path/filepath" "go.uber.org/zap" + "github.com/pipe-cd/pipecd/pkg/cache" "github.com/pipe-cd/pipecd/pkg/datastore" + "github.com/pipe-cd/pipecd/pkg/datastore/filedb/objectcache" "github.com/pipe-cd/pipecd/pkg/filestore" ) type FileDB struct { - backend filestore.Store - logger *zap.Logger + backend filestore.Store + objectCache objectcache.Cache + logger *zap.Logger } type Option func(*FileDB) @@ -37,10 +41,11 @@ func WithLogger(logger *zap.Logger) Option { } } -func NewFileDB(fs filestore.Store, opts ...Option) (*FileDB, error) { +func NewFileDB(fs filestore.Store, c cache.Cache, opts ...Option) (*FileDB, error) { fd := &FileDB{ - backend: fs, - logger: zap.NewNop(), + backend: fs, + objectCache: objectcache.NewCache(c), + logger: zap.NewNop(), } for _, opt := range opts { opt(fd) @@ -61,11 +66,98 @@ func (f *FileDB) fetch(ctx context.Context, path string) ([]byte, error) { } func (f *FileDB) Find(ctx context.Context, col datastore.Collection, opts datastore.ListOptions) (datastore.Iterator, error) { - _, ok := col.(datastore.ShardStorable) + scol, ok := col.(datastore.ShardStorable) if !ok { return nil, datastore.ErrUnsupported } - return nil, datastore.ErrUnimplemented + + var ( + kind = col.Kind() + shards = scol.ListInUsedShards() + // Map of objects values with the first key is the object id. + objects map[string][][]byte + ) + + for _, shard := range shards { + dpath := makeHotStorageDirPath(kind, shard) + parts, err := f.backend.List(ctx, dpath) + if err != nil { + f.logger.Error("failed to find entities", + zap.String("kind", kind), + zap.Error(err), + ) + return nil, err + } + + if objects == nil { + objects = make(map[string][][]byte, len(parts)) + } + for _, obj := range parts { + id := filepath.Base(obj.Path) + + // Try to get object content from objectCache. + cdata, err := f.objectCache.Get(shard, id, obj.Etag) + if err == nil { + objects[id] = append(objects[id], cdata) + continue + } + + // If there is no object content found from objectCache, try fetching + // content under the object path. + data, err := f.fetch(ctx, obj.Path) + if err != nil { + f.logger.Error("failed to fetch entity part", + zap.String("kind", kind), + zap.String("id", id), + zap.Error(err), + ) + return nil, err + } + + // Store fetched data to cache. + if err = f.objectCache.Put(shard, id, obj.Etag, data); err != nil { + f.logger.Error("failed to store entity part to cache", + zap.String("kind", kind), + zap.String("id", id), + zap.String("etag", obj.Etag), + zap.Error(err), + ) + } + + objects[id] = append(objects[id], data) + } + } + + entities := make([]interface{}, 0, len(objects)) + for id, obj := range objects { + e := col.Factory()() + if err := decode(col, e, obj...); err != nil { + f.logger.Error("failed to build entity", + zap.String("kind", kind), + zap.String("id", id), + zap.Error(err), + ) + return nil, err + } + + pass, err := filter(col, e, opts.Filters) + if err != nil { + f.logger.Error("failed to filter entity", + zap.String("kind", kind), + zap.String("id", id), + zap.Error(err), + ) + return nil, err + } + + if pass { + entities = append(entities, e) + } + } + + return &Iterator{ + data: entities, + }, nil } func (f *FileDB) Get(ctx context.Context, col datastore.Collection, id string, v interface{}) error { @@ -141,3 +233,7 @@ func makeHotStorageFilePath(kind, id string, shard datastore.Shard) string { // TODO: Find a way to separate files by project to avoid fetch resources cross project. return fmt.Sprintf("%s/%s/%s.json", kind, shard, id) } + +func makeHotStorageDirPath(kind string, shard datastore.Shard) string { + return fmt.Sprintf("%s/%s/", kind, shard) +} diff --git a/pkg/datastore/filedb/filter.go b/pkg/datastore/filedb/filter.go new file mode 100644 index 0000000000..cddfce190b --- /dev/null +++ b/pkg/datastore/filedb/filter.go @@ -0,0 +1,33 @@ +// Copyright 2022 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filedb + +import ( + "github.com/pipe-cd/pipecd/pkg/datastore" +) + +// TODO: Implement filterable interface for each collection. +type filterable interface { + Match(e interface{}, filters []datastore.ListFilter) (bool, error) +} + +func filter(col datastore.Collection, e interface{}, filters []datastore.ListFilter) (bool, error) { + fcol, ok := col.(filterable) + if !ok { + return false, datastore.ErrUnsupported + } + + return fcol.Match(e, filters) +} diff --git a/pkg/datastore/filedb/iterator.go b/pkg/datastore/filedb/iterator.go new file mode 100644 index 0000000000..93edd48e7c --- /dev/null +++ b/pkg/datastore/filedb/iterator.go @@ -0,0 +1,29 @@ +// Copyright 2022 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filedb + +import "github.com/pipe-cd/pipecd/pkg/datastore" + +type Iterator struct { + data []interface{} +} + +func (it *Iterator) Next(dst interface{}) error { + return datastore.ErrUnimplemented +} + +func (it *Iterator) Cursor() (string, error) { + return "", datastore.ErrUnimplemented +} diff --git a/pkg/datastore/filedb/objectcache/BUILD.bazel b/pkg/datastore/filedb/objectcache/BUILD.bazel new file mode 100644 index 0000000000..0a4d142e62 --- /dev/null +++ b/pkg/datastore/filedb/objectcache/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["cache.go"], + importpath = "github.com/pipe-cd/pipecd/pkg/datastore/filedb/objectcache", + visibility = ["//visibility:public"], + deps = [ + "//pkg/cache:go_default_library", + "//pkg/datastore:go_default_library", + ], +) diff --git a/pkg/datastore/filedb/objectcache/cache.go b/pkg/datastore/filedb/objectcache/cache.go new file mode 100644 index 0000000000..598e2f4ae7 --- /dev/null +++ b/pkg/datastore/filedb/objectcache/cache.go @@ -0,0 +1,76 @@ +// Copyright 2022 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package objectcache + +import ( + "encoding/json" + "fmt" + + "github.com/pipe-cd/pipecd/pkg/cache" + "github.com/pipe-cd/pipecd/pkg/datastore" +) + +type Cache interface { + Get(shard datastore.Shard, id, etag string) ([]byte, error) + Put(shard datastore.Shard, id, etag string, val []byte) error +} + +type objectCache struct { + backend cache.Cache +} + +func NewCache(c cache.Cache) Cache { + return &objectCache{backend: c} +} + +type objectValue struct { + Etag string `json:"etag"` + Data []byte `json:"data"` +} + +func (o *objectCache) Get(shard datastore.Shard, id, etag string) ([]byte, error) { + raw, err := o.backend.Get(makeObjectKey(shard, id)) + if err != nil { + return nil, err + } + + var obj objectValue + if err = json.Unmarshal(raw.([]byte), &obj); err != nil { + return nil, err + } + + if etag == obj.Etag { + return obj.Data, nil + } + return nil, cache.ErrNotFound +} + +func (o *objectCache) Put(shard datastore.Shard, id, etag string, val []byte) error { + obj := &objectValue{ + Etag: etag, + Data: val, + } + + data, err := json.Marshal(obj) + if err != nil { + return err + } + + return o.backend.Put(makeObjectKey(shard, id), data) +} + +func makeObjectKey(shard datastore.Shard, id string) string { + return fmt.Sprintf("FILEDB:OBJECT:%s:%s", shard, id) +}