diff --git a/staging/operator-registry/cmd/opm/serve/serve.go b/staging/operator-registry/cmd/opm/serve/serve.go index 633ad9ef3f..8b7d280764 100644 --- a/staging/operator-registry/cmd/opm/serve/serve.go +++ b/staging/operator-registry/cmd/opm/serve/serve.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "io" "net" "net/http" endpoint "net/http/pprof" @@ -19,17 +20,18 @@ import ( "github.com/operator-framework/operator-registry/pkg/api" health "github.com/operator-framework/operator-registry/pkg/api/grpc_health_v1" + "github.com/operator-framework/operator-registry/pkg/cache" "github.com/operator-framework/operator-registry/pkg/lib/dns" "github.com/operator-framework/operator-registry/pkg/lib/graceful" "github.com/operator-framework/operator-registry/pkg/lib/log" - "github.com/operator-framework/operator-registry/pkg/registry" "github.com/operator-framework/operator-registry/pkg/server" ) type serve struct { - configDir string - cacheDir string - cacheOnly bool + configDir string + cacheDir string + cacheOnly bool + cacheEnforceIntegrity bool port string terminationLog string @@ -59,15 +61,19 @@ startup. Changes made to the declarative config after the this command starts will not be reflected in the served content. `, Args: cobra.ExactArgs(1), - PreRunE: func(_ *cobra.Command, args []string) error { + PreRun: func(_ *cobra.Command, args []string) { s.configDir = args[0] if s.debug { logger.SetLevel(logrus.DebugLevel) } - return nil }, - RunE: func(cmd *cobra.Command, _ []string) error { - return s.run(cmd.Context()) + Run: func(cmd *cobra.Command, _ []string) { + if !cmd.Flags().Changed("cache-enforce-integrity") { + s.cacheEnforceIntegrity = s.cacheDir != "" && !s.cacheOnly + } + if err := s.run(cmd.Context()); err != nil { + logger.Fatal(err) + } }, } @@ -77,6 +83,7 @@ will not be reflected in the served content. cmd.Flags().StringVar(&s.pprofAddr, "pprof-addr", "", "address of startup profiling endpoint (addr:port format)") cmd.Flags().StringVar(&s.cacheDir, "cache-dir", "", "if set, sync and persist server cache directory") cmd.Flags().BoolVar(&s.cacheOnly, "cache-only", false, "sync the serve cache and exit without serving") + cmd.Flags().BoolVar(&s.cacheEnforceIntegrity, "cache-enforce-integrity", false, "exit with error if cache is not present or has been invalidated. (default: true when --cache-dir is set and --cache-only is false, false otherwise), ") return cmd } @@ -102,11 +109,38 @@ func (s *serve) run(ctx context.Context) error { s.logger = s.logger.WithFields(logrus.Fields{"configs": s.configDir, "port": s.port}) - store, err := registry.NewQuerierFromFS(os.DirFS(s.configDir), s.cacheDir) - defer store.Close() + if s.cacheDir == "" && s.cacheEnforceIntegrity { + return fmt.Errorf("--cache-dir must be specified with --cache-enforce-integrity") + } + + if s.cacheDir == "" { + s.cacheDir, err = os.MkdirTemp("", "opm-serve-cache-") + if err != nil { + return err + } + defer os.RemoveAll(s.cacheDir) + } + + store, err := cache.New(s.cacheDir) if err != nil { return err } + if storeCloser, ok := store.(io.Closer); ok { + defer storeCloser.Close() + } + if s.cacheEnforceIntegrity { + if err := store.CheckIntegrity(os.DirFS(s.configDir)); err != nil { + return err + } + if err := store.Load(); err != nil { + return err + } + } else { + if err := cache.LoadOrRebuild(store, os.DirFS(s.configDir)); err != nil { + return err + } + } + if s.cacheOnly { return nil } diff --git a/staging/operator-registry/go.mod b/staging/operator-registry/go.mod index d00390c072..714c6abe51 100644 --- a/staging/operator-registry/go.mod +++ b/staging/operator-registry/go.mod @@ -35,7 +35,7 @@ require ( golang.org/x/net v0.0.0-20220407224826-aac1ed45d8e3 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/sys v0.0.0-20220412211240-33da011f77ad - google.golang.org/grpc v1.45.0 + google.golang.org/grpc v1.46.0 google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200709232328-d8193ee9cc3e google.golang.org/protobuf v1.28.0 gopkg.in/yaml.v2 v2.4.0 @@ -59,7 +59,7 @@ require ( github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d // indirect github.com/alessio/shellescape v1.4.1 // indirect - github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e // indirect + github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20220418222510-f25a4f6275ed // indirect github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/blang/semver v3.5.1+incompatible // indirect @@ -93,7 +93,8 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/google/cel-go v0.10.1 // indirect + github.com/golang/snappy v0.0.3 // indirect + github.com/google/cel-go v0.12.4 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect @@ -107,7 +108,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect - github.com/klauspost/compress v1.11.13 // indirect + github.com/klauspost/compress v1.12.3 // indirect github.com/mailru/easyjson v0.7.6 // indirect github.com/mattn/go-isatty v0.0.12 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect @@ -154,7 +155,7 @@ require ( golang.org/x/tools v0.1.10 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect + google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/square/go-jose.v2 v2.6.0 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect diff --git a/staging/operator-registry/go.sum b/staging/operator-registry/go.sum index 27ac35590b..243708d639 100644 --- a/staging/operator-registry/go.sum +++ b/staging/operator-registry/go.sum @@ -87,8 +87,9 @@ github.com/alessio/shellescape v1.4.1 h1:V7yhSDDn8LP4lc4jS8pFkt0zCnzVJlG5JXy9BVK github.com/alessio/shellescape v1.4.1/go.mod h1:PZAiSCk0LJaZkiCSkPv8qIobYglO3FPpyFjDCtHLS30= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e h1:GCzyKMDDjSGnlpl3clrdAK7I1AaVoaiKDOYkUzChZzg= github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY= +github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20220418222510-f25a4f6275ed h1:ue9pVfIcP+QMEjfgo/Ez4ZjNZfonGgR6NgjMaJMu1Cg= +github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20220418222510-f25a4f6275ed/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= @@ -147,8 +148,8 @@ github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnht github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/cockroachdb/cockroach-go v0.0.0-20181001143604-e0a95dfd547c/go.mod h1:XGLbWH/ujMcbPbhZq52Nv6UrCghb1yGn//133kEsvDk= @@ -250,7 +251,7 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= -github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= +github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -374,12 +375,15 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golangplus/testing v0.0.0-20180327235837-af21d9c3145e/go.mod h1:0AA//k/eakGydO4jKRoRL2j92ZKSzTgj9tclaCrvXHk= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= -github.com/google/cel-go v0.10.1 h1:MQBGSZGnDwh7T/un+mzGKOMz3x+4E/GDPprWjDL+1Jg= github.com/google/cel-go v0.10.1/go.mod h1:U7ayypeSkw23szu4GaQTPJGx66c20mx8JklMSxrmI1w= +github.com/google/cel-go v0.12.4 h1:YINKfuHZ8n72tPOqSPZBwGiDpew2CJS48mdM5W8LZQU= +github.com/google/cel-go v0.12.4/go.mod h1:Av7CU6r6X3YmcHR9GXqVDaEJYfEtSxl6wvIjUQTriCw= github.com/google/cel-spec v0.6.0/go.mod h1:Nwjgxy5CbjlPrtCWjeDjUyKMl8w41YBYGjsyDdqk0xA= github.com/google/gnostic v0.5.7-v3refs h1:FhTMOKj2VhjpouxvWJAV1TL304uMlb9zcDqkl6cEI54= github.com/google/gnostic v0.5.7-v3refs/go.mod h1:73MKFl6jIHelAJNaBGFzt3SPtZULs9dYrGFt8OiIsHQ= @@ -517,8 +521,8 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.11.13 h1:eSvu8Tmq6j2psUJqJrLcWH6K3w5Dwc+qipbaA6eVEN4= -github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU= +github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -1257,8 +1261,8 @@ google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1/go.mod h1:9lPAdzaE google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20220107163113-42d7afdf6368/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= -google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac h1:qSNTkEN+L2mvWcLgJOR+8bdHX9rN/IdU3A1Ghpfb1Rg= -google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo= +google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21 h1:hrbNEivu7Zn1pxvHk6MBrq9iE22woVILTHqexqBxe6I= +google.golang.org/genproto v0.0.0-20220502173005-c8bf987b8c21/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= google.golang.org/grpc v0.0.0-20160317175043-d3ddb4469d5a/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -1284,8 +1288,8 @@ google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= -google.golang.org/grpc v1.45.0 h1:NEpgUqV3Z+ZjkqMsxMg11IaDrXY4RY6CQukSGK0uI1M= -google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= +google.golang.org/grpc v1.46.0 h1:oCjezcn6g6A75TGoKYBPgKmVBLexhYLM6MebdrPApP8= +google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200709232328-d8193ee9cc3e h1:4BwkYybqoRhPKm97iNO3ACkxj26G0hC18CaO9QXOxto= google.golang.org/grpc/cmd/protoc-gen-go-grpc v0.0.0-20200709232328-d8193ee9cc3e/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/grpc/examples v0.0.0-20201130180447-c456688b1860/go.mod h1:Ly7ZA/ARzg8fnPU9TyZIxoz33sEUuWX7txiqs8lPTgE= diff --git a/staging/operator-registry/pkg/cache/cache.go b/staging/operator-registry/pkg/cache/cache.go new file mode 100644 index 0000000000..7df08d6f50 --- /dev/null +++ b/staging/operator-registry/pkg/cache/cache.go @@ -0,0 +1,104 @@ +package cache + +import ( + "context" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/operator-framework/operator-registry/pkg/api" + "github.com/operator-framework/operator-registry/pkg/registry" +) + +type Cache interface { + registry.GRPCQuery + + CheckIntegrity(fbc fs.FS) error + Build(fbc fs.FS) error + Load() error +} + +func LoadOrRebuild(c Cache, fbc fs.FS) error { + if err := c.CheckIntegrity(fbc); err != nil { + if err := c.Build(fbc); err != nil { + return err + } + } + return c.Load() +} + +// New creates a new Cache. It chooses a cache implementation based +// on the files it finds in the cache directory, with a preference for the +// latest iteration of the cache implementation. It returns an error if +// cacheDir exists and contains unexpected files. +func New(cacheDir string) (Cache, error) { + entries, err := os.ReadDir(cacheDir) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("detect cache format: read cache directory: %v", err) + } + jsonCache := sets.NewString(jsonDir, jsonDigestFile) + + found := sets.NewString() + for _, e := range entries { + found.Insert(e.Name()) + } + + // Preference (and currently only supported) is the JSON-based cache implementation. + if found.IsSuperset(jsonCache) || len(entries) == 0 { + return NewJSON(cacheDir), nil + } + + // Anything else is unexpected. + return nil, fmt.Errorf("cache directory has unexpected contents") +} + +func ensureEmptyDir(dir string, mode os.FileMode) error { + if err := os.MkdirAll(dir, mode); err != nil { + return err + } + entries, err := os.ReadDir(dir) + if err != nil { + return err + } + for _, entry := range entries { + if err := os.RemoveAll(filepath.Join(dir, entry.Name())); err != nil { + return err + } + } + return nil +} + +func doesBundleProvide(ctx context.Context, c Cache, pkgName, chName, bundleName, group, version, kind string) (bool, error) { + apiBundle, err := c.GetBundle(ctx, pkgName, chName, bundleName) + if err != nil { + return false, fmt.Errorf("get bundle %q: %v", bundleName, err) + } + for _, gvk := range apiBundle.ProvidedApis { + if gvk.Group == group && gvk.Version == version && gvk.Kind == kind { + return true, nil + } + } + return false, nil +} + +type sliceBundleSender []*api.Bundle + +func (s *sliceBundleSender) Send(b *api.Bundle) error { + *s = append(*s, b) + return nil +} + +func listBundles(ctx context.Context, c Cache) ([]*api.Bundle, error) { + var bundleSender sliceBundleSender + + err := c.SendBundles(ctx, &bundleSender) + if err != nil { + return nil, err + } + + return bundleSender, nil +} diff --git a/staging/operator-registry/pkg/registry/query_test.go b/staging/operator-registry/pkg/cache/cache_test.go similarity index 86% rename from staging/operator-registry/pkg/registry/query_test.go rename to staging/operator-registry/pkg/cache/cache_test.go index a732de7413..a7140d6111 100644 --- a/staging/operator-registry/pkg/registry/query_test.go +++ b/staging/operator-registry/pkg/cache/cache_test.go @@ -1,4 +1,4 @@ -package registry +package cache import ( "context" @@ -8,14 +8,11 @@ import ( "github.com/stretchr/testify/require" - "github.com/operator-framework/operator-registry/alpha/declcfg" - "github.com/operator-framework/operator-registry/alpha/model" - "github.com/operator-framework/operator-registry/alpha/property" + "github.com/operator-framework/operator-registry/pkg/registry" ) -func TestQuerier_GetBundle(t *testing.T) { - for _, testQuerier := range genTestQueriers(t, validFS) { - defer testQuerier.Close() +func TestCache_GetBundle(t *testing.T) { + for _, testQuerier := range genTestCaches(t, validFS) { b, err := testQuerier.GetBundle(context.TODO(), "etcd", "singlenamespace-alpha", "etcdoperator.v0.9.4") require.NoError(t, err) require.Equal(t, b.PackageName, "etcd") @@ -24,9 +21,8 @@ func TestQuerier_GetBundle(t *testing.T) { } } -func TestQuerier_GetBundleForChannel(t *testing.T) { - for _, testQuerier := range genTestQueriers(t, validFS) { - defer testQuerier.Close() +func TestCache_GetBundleForChannel(t *testing.T) { + for _, testQuerier := range genTestCaches(t, validFS) { b, err := testQuerier.GetBundleForChannel(context.TODO(), "etcd", "singlenamespace-alpha") require.NoError(t, err) require.NotNil(t, b) @@ -36,9 +32,8 @@ func TestQuerier_GetBundleForChannel(t *testing.T) { } } -func TestQuerier_GetBundleThatProvides(t *testing.T) { - for _, testQuerier := range genTestQueriers(t, validFS) { - defer testQuerier.Close() +func TestCache_GetBundleThatProvides(t *testing.T) { + for _, testQuerier := range genTestCaches(t, validFS) { b, err := testQuerier.GetBundleThatProvides(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdBackup") require.NoError(t, err) require.NotNil(t, b) @@ -48,9 +43,8 @@ func TestQuerier_GetBundleThatProvides(t *testing.T) { } } -func TestQuerier_GetBundleThatReplaces(t *testing.T) { - for _, testQuerier := range genTestQueriers(t, validFS) { - defer testQuerier.Close() +func TestCache_GetBundleThatReplaces(t *testing.T) { + for _, testQuerier := range genTestCaches(t, validFS) { b, err := testQuerier.GetBundleThatReplaces(context.TODO(), "etcdoperator.v0.9.0", "etcd", "singlenamespace-alpha") require.NoError(t, err) require.NotNil(t, b) @@ -60,13 +54,12 @@ func TestQuerier_GetBundleThatReplaces(t *testing.T) { } } -func TestQuerier_GetChannelEntriesThatProvide(t *testing.T) { - for _, testQuerier := range genTestQueriers(t, validFS) { - defer testQuerier.Close() +func TestCache_GetChannelEntriesThatProvide(t *testing.T) { + for _, testQuerier := range genTestCaches(t, validFS) { entries, err := testQuerier.GetChannelEntriesThatProvide(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdBackup") require.NoError(t, err) require.NotNil(t, entries) - require.ElementsMatch(t, []*ChannelEntry{ + require.ElementsMatch(t, []*registry.ChannelEntry{ { PackageName: "etcd", ChannelName: "singlenamespace-alpha", @@ -107,13 +100,12 @@ func TestQuerier_GetChannelEntriesThatProvide(t *testing.T) { } } -func TestQuerier_GetChannelEntriesThatReplace(t *testing.T) { - for _, testQuerier := range genTestQueriers(t, validFS) { - defer testQuerier.Close() +func TestCache_GetChannelEntriesThatReplace(t *testing.T) { + for _, testQuerier := range genTestCaches(t, validFS) { entries, err := testQuerier.GetChannelEntriesThatReplace(context.TODO(), "etcdoperator.v0.9.0") require.NoError(t, err) require.NotNil(t, entries) - require.ElementsMatch(t, []*ChannelEntry{ + require.ElementsMatch(t, []*registry.ChannelEntry{ { PackageName: "etcd", ChannelName: "singlenamespace-alpha", @@ -130,13 +122,12 @@ func TestQuerier_GetChannelEntriesThatReplace(t *testing.T) { } } -func TestQuerier_GetLatestChannelEntriesThatProvide(t *testing.T) { - for _, testQuerier := range genTestQueriers(t, validFS) { - defer testQuerier.Close() +func TestCache_GetLatestChannelEntriesThatProvide(t *testing.T) { + for _, testQuerier := range genTestCaches(t, validFS) { entries, err := testQuerier.GetLatestChannelEntriesThatProvide(context.TODO(), "etcd.database.coreos.com", "v1beta2", "EtcdBackup") require.NoError(t, err) require.NotNil(t, entries) - require.ElementsMatch(t, []*ChannelEntry{ + require.ElementsMatch(t, []*registry.ChannelEntry{ { PackageName: "etcd", ChannelName: "singlenamespace-alpha", @@ -153,17 +144,16 @@ func TestQuerier_GetLatestChannelEntriesThatProvide(t *testing.T) { } } -func TestQuerier_GetPackage(t *testing.T) { - for _, testQuerier := range genTestQueriers(t, validFS) { - defer testQuerier.Close() +func TestCache_GetPackage(t *testing.T) { + for _, testQuerier := range genTestCaches(t, validFS) { p, err := testQuerier.GetPackage(context.TODO(), "etcd") require.NoError(t, err) require.NotNil(t, p) - expected := &PackageManifest{ + expected := ®istry.PackageManifest{ PackageName: "etcd", DefaultChannelName: "singlenamespace-alpha", - Channels: []PackageChannel{ + Channels: []registry.PackageChannel{ { Name: "singlenamespace-alpha", CurrentCSVName: "etcdoperator.v0.9.4", @@ -185,9 +175,8 @@ func TestQuerier_GetPackage(t *testing.T) { } } -func TestQuerier_ListBundles(t *testing.T) { - for _, testQuerier := range genTestQueriers(t, validFS) { - defer testQuerier.Close() +func TestCache_ListBundles(t *testing.T) { + for _, testQuerier := range genTestCaches(t, validFS) { bundles, err := testQuerier.ListBundles(context.TODO()) require.NoError(t, err) require.NotNil(t, bundles) @@ -199,9 +188,8 @@ func TestQuerier_ListBundles(t *testing.T) { } } -func TestQuerier_ListPackages(t *testing.T) { - for _, testQuerier := range genTestQueriers(t, validFS) { - defer testQuerier.Close() +func TestCache_ListPackages(t *testing.T) { + for _, testQuerier := range genTestCaches(t, validFS) { packages, err := testQuerier.ListPackages(context.TODO()) require.NoError(t, err) require.NotNil(t, packages) @@ -209,61 +197,21 @@ func TestQuerier_ListPackages(t *testing.T) { } } -func TestQuerier_BadBundleRaisesError(t *testing.T) { +func genTestCaches(t *testing.T, fbcFS fs.FS) []Cache { t.Helper() - t.Run("InvalidModel", func(t *testing.T) { - // Convert a good FS into a model (we need the model to validate - // in the declcfg.ConvertToModel step) - cfg, err := declcfg.LoadFS(validFS) - require.NoError(t, err) + caches := []Cache{ + NewJSON(t.TempDir()), + } - m, err := declcfg.ConvertToModel(*cfg) + for _, c := range caches { + err := c.Build(fbcFS) require.NoError(t, err) + err = c.Load() + require.NoError(t, err) + } - // break the model by adding another package property - bundle := func() *model.Bundle { - for _, pkg := range m { - for _, ch := range pkg.Channels { - for _, bundle := range ch.Bundles { - return bundle - } - } - } - return nil - }() - - bundle.Properties = append(bundle.Properties, property.Property{ - Type: PackageType, - Value: []byte("{\"packageName\": \"another-package\", \"version\": \"1.0.0\"}"), - }) - - _, err = NewQuerier(m) - require.EqualError(t, err, `parse properties: expected exactly 1 property of type "olm.package", found 2`) - }) - - t.Run("InvalidFS", func(t *testing.T) { - _, err := NewQuerierFromFS(badBundleFS, t.TempDir()) - require.EqualError(t, err, `package "cockroachdb" bundle "cockroachdb.v5.0.3" must have exactly 1 "olm.package" property, found 2`) - }) -} - -func genTestQueriers(t *testing.T, fbcFS fs.FS) []*Querier { - t.Helper() - - cfg, err := declcfg.LoadFS(fbcFS) - require.NoError(t, err) - - m, err := declcfg.ConvertToModel(*cfg) - require.NoError(t, err) - - fromModel, err := NewQuerier(m) - require.NoError(t, err) - - fromFS, err := NewQuerierFromFS(fbcFS, t.TempDir()) - require.NoError(t, err) - - return []*Querier{fromModel, fromFS} + return caches } var validFS = fstest.MapFS{ diff --git a/staging/operator-registry/pkg/cache/json.go b/staging/operator-registry/pkg/cache/json.go new file mode 100644 index 0000000000..47a54952ba --- /dev/null +++ b/staging/operator-registry/pkg/cache/json.go @@ -0,0 +1,257 @@ +package cache + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "hash/fnv" + "io/fs" + "os" + "path/filepath" + "strings" + + "github.com/operator-framework/operator-registry/alpha/declcfg" + "github.com/operator-framework/operator-registry/pkg/api" + "github.com/operator-framework/operator-registry/pkg/registry" +) + +var _ Cache = &JSON{} + +type JSON struct { + baseDir string + + packageIndex + apiBundles map[apiBundleKey]string +} + +const ( + jsonCacheModeDir = 0750 + jsonCacheModeFile = 0640 +) + +type apiBundleKey struct { + pkgName string + chName string + name string +} + +func (q *JSON) loadAPIBundle(k apiBundleKey) (*api.Bundle, error) { + filename, ok := q.apiBundles[k] + if !ok { + return nil, fmt.Errorf("package %q, channel %q, bundle %q not found", k.pkgName, k.chName, k.name) + } + d, err := os.ReadFile(filename) + if err != nil { + return nil, err + } + var b api.Bundle + if err := json.Unmarshal(d, &b); err != nil { + return nil, err + } + return &b, nil +} + +func (q *JSON) ListBundles(ctx context.Context) ([]*api.Bundle, error) { + return listBundles(ctx, q) +} + +func (q *JSON) SendBundles(_ context.Context, s registry.BundleSender) error { + for _, pkg := range q.packageIndex { + for _, ch := range pkg.Channels { + for _, b := range ch.Bundles { + apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, b.Name}) + if err != nil { + return fmt.Errorf("convert bundle %q: %v", b.Name, err) + } + if apiBundle.BundlePath != "" { + // The SQLite-based server + // configures its querier to + // omit these fields when + // bundle path is set. + apiBundle.CsvJson = "" + apiBundle.Object = nil + } + if err := s.Send(apiBundle); err != nil { + return err + } + } + } + } + return nil +} + +func (q *JSON) GetBundle(_ context.Context, pkgName, channelName, csvName string) (*api.Bundle, error) { + pkg, ok := q.packageIndex[pkgName] + if !ok { + return nil, fmt.Errorf("package %q not found", pkgName) + } + ch, ok := pkg.Channels[channelName] + if !ok { + return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) + } + b, ok := ch.Bundles[csvName] + if !ok { + return nil, fmt.Errorf("package %q, channel %q, bundle %q not found", pkgName, channelName, csvName) + } + apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, b.Name}) + if err != nil { + return nil, fmt.Errorf("convert bundle %q: %v", b.Name, err) + } + + // unset Replaces and Skips (sqlite query does not populate these fields) + apiBundle.Replaces = "" + apiBundle.Skips = nil + return apiBundle, nil +} + +func (q *JSON) GetBundleForChannel(ctx context.Context, pkgName string, channelName string) (*api.Bundle, error) { + return q.packageIndex.GetBundleForChannel(ctx, q, pkgName, channelName) +} + +func (q *JSON) GetBundleThatReplaces(ctx context.Context, name, pkgName, channelName string) (*api.Bundle, error) { + return q.packageIndex.GetBundleThatReplaces(ctx, q, name, pkgName, channelName) +} + +func (q *JSON) GetChannelEntriesThatProvide(ctx context.Context, group, version, kind string) ([]*registry.ChannelEntry, error) { + return q.packageIndex.GetChannelEntriesThatProvide(ctx, q, group, version, kind) +} + +func (q *JSON) GetLatestChannelEntriesThatProvide(ctx context.Context, group, version, kind string) ([]*registry.ChannelEntry, error) { + return q.packageIndex.GetLatestChannelEntriesThatProvide(ctx, q, group, version, kind) +} + +func (q *JSON) GetBundleThatProvides(ctx context.Context, group, version, kind string) (*api.Bundle, error) { + return q.packageIndex.GetBundleThatProvides(ctx, q, group, version, kind) +} + +func NewJSON(baseDir string) *JSON { + return &JSON{baseDir: baseDir} +} + +const ( + jsonDigestFile = "digest" + jsonDir = "cache" + packagesFile = jsonDir + string(filepath.Separator) + "packages.json" +) + +func (q *JSON) CheckIntegrity(fbcFsys fs.FS) error { + existingDigest, err := q.existingDigest() + if err != nil { + return fmt.Errorf("read existing cache digest: %v", err) + } + computedDigest, err := q.computeDigest(fbcFsys) + if err != nil { + return fmt.Errorf("compute digest: %v", err) + } + if existingDigest != computedDigest { + return fmt.Errorf("cache requires rebuild: cache reports digest as %q, but computed digest is %q", existingDigest, computedDigest) + } + return nil +} + +func (q *JSON) existingDigest() (string, error) { + existingDigestBytes, err := os.ReadFile(filepath.Join(q.baseDir, jsonDigestFile)) + if err != nil { + return "", err + } + return strings.TrimSpace(string(existingDigestBytes)), nil +} + +func (q *JSON) computeDigest(fbcFsys fs.FS) (string, error) { + computedHasher := fnv.New64a() + if err := fsToTar(computedHasher, fbcFsys); err != nil { + return "", err + } + + if cacheFS, err := fs.Sub(os.DirFS(q.baseDir), jsonDir); err == nil { + if err := fsToTar(computedHasher, cacheFS); err != nil && !errors.Is(err, os.ErrNotExist) { + return "", fmt.Errorf("compute hash: %v", err) + } + } + return fmt.Sprintf("%x", computedHasher.Sum(nil)), nil +} + +func (q *JSON) Build(fbcFsys fs.FS) error { + // ensure that generated cache is available to all future users + oldUmask := umask(000) + defer umask(oldUmask) + + if err := ensureEmptyDir(q.baseDir, jsonCacheModeDir); err != nil { + return fmt.Errorf("ensure clean base directory: %v", err) + } + if err := ensureEmptyDir(filepath.Join(q.baseDir, jsonDir), jsonCacheModeDir); err != nil { + return fmt.Errorf("ensure clean base directory: %v", err) + } + + fbc, err := declcfg.LoadFS(fbcFsys) + if err != nil { + return err + } + fbcModel, err := declcfg.ConvertToModel(*fbc) + if err != nil { + return err + } + + pkgs, err := packagesFromModel(fbcModel) + if err != nil { + return err + } + + packageJson, err := json.Marshal(pkgs) + if err != nil { + return err + } + if err := os.WriteFile(filepath.Join(q.baseDir, packagesFile), packageJson, jsonCacheModeFile); err != nil { + return err + } + + q.apiBundles = map[apiBundleKey]string{} + for _, p := range fbcModel { + for _, ch := range p.Channels { + for _, b := range ch.Bundles { + apiBundle, err := api.ConvertModelBundleToAPIBundle(*b) + if err != nil { + return err + } + jsonBundle, err := json.Marshal(apiBundle) + if err != nil { + return err + } + filename := filepath.Join(q.baseDir, jsonDir, fmt.Sprintf("%s_%s_%s.json", p.Name, ch.Name, b.Name)) + if err := os.WriteFile(filename, jsonBundle, jsonCacheModeFile); err != nil { + return err + } + q.apiBundles[apiBundleKey{p.Name, ch.Name, b.Name}] = filename + } + } + } + digest, err := q.computeDigest(fbcFsys) + if err != nil { + return err + } + if err := os.WriteFile(filepath.Join(q.baseDir, jsonDigestFile), []byte(digest), jsonCacheModeFile); err != nil { + return err + } + return nil +} + +func (q *JSON) Load() error { + packagesData, err := os.ReadFile(filepath.Join(q.baseDir, packagesFile)) + if err != nil { + return err + } + if err := json.Unmarshal(packagesData, &q.packageIndex); err != nil { + return err + } + q.apiBundles = map[apiBundleKey]string{} + for _, p := range q.packageIndex { + for _, ch := range p.Channels { + for _, b := range ch.Bundles { + filename := filepath.Join(q.baseDir, jsonDir, fmt.Sprintf("%s_%s_%s.json", p.Name, ch.Name, b.Name)) + q.apiBundles[apiBundleKey{pkgName: p.Name, chName: ch.Name, name: b.Name}] = filename + } + } + } + return nil +} diff --git a/staging/operator-registry/pkg/cache/json_test.go b/staging/operator-registry/pkg/cache/json_test.go new file mode 100644 index 0000000000..dee58bacda --- /dev/null +++ b/staging/operator-registry/pkg/cache/json_test.go @@ -0,0 +1,107 @@ +package cache + +import ( + "io/fs" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestJSON_StableDigest(t *testing.T) { + cacheDir := t.TempDir() + c := NewJSON(cacheDir) + require.NoError(t, c.Build(validFS)) + + actualDigest, err := c.existingDigest() + require.NoError(t, err) + + // NOTE: The entire purpose of this test is to ensure that we don't change the cache + // implementation and inadvertantly invalidate existing caches. + // + // Therefore, DO NOT CHANGE the expected digest value here unless validFS also + // changes. + // + // If validFS needs to change DO NOT CHANGE the json cache implementation + // in the same pull request. + require.Equal(t, "9adad9ff6cf54e4f", actualDigest) +} + +func TestJSON_CheckIntegrity(t *testing.T) { + type testCase struct { + name string + build bool + fbcFS fs.FS + mod func(tc *testCase, cacheDir string) error + expect func(t *testing.T, err error) + } + testCases := []testCase{ + { + name: "non-existent cache dir", + fbcFS: validFS, + mod: func(tc *testCase, cacheDir string) error { + return os.RemoveAll(cacheDir) + }, + expect: func(t *testing.T, err error) { + require.Error(t, err) + require.Contains(t, err.Error(), "read existing cache digest") + }, + }, + { + name: "empty cache dir", + fbcFS: validFS, + expect: func(t *testing.T, err error) { + require.Error(t, err) + require.Contains(t, err.Error(), "read existing cache digest") + }, + }, + { + name: "valid cache dir", + build: true, + fbcFS: validFS, + expect: func(t *testing.T, err error) { + require.NoError(t, err) + }, + }, + { + name: "different FBC", + build: true, + fbcFS: validFS, + mod: func(tc *testCase, _ string) error { + tc.fbcFS = badBundleFS + return nil + }, + expect: func(t *testing.T, err error) { + require.Error(t, err) + require.Contains(t, err.Error(), "cache requires rebuild") + }, + }, + { + name: "different cache", + build: true, + fbcFS: validFS, + mod: func(tc *testCase, cacheDir string) error { + return os.WriteFile(filepath.Join(cacheDir, jsonDir, "foo"), []byte("bar"), jsonCacheModeFile) + }, + expect: func(t *testing.T, err error) { + require.Error(t, err) + require.Contains(t, err.Error(), "cache requires rebuild") + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cacheDir := t.TempDir() + c := NewJSON(cacheDir) + + if tc.build { + require.NoError(t, c.Build(tc.fbcFS)) + } + if tc.mod != nil { + require.NoError(t, tc.mod(&tc, cacheDir)) + } + tc.expect(t, c.CheckIntegrity(tc.fbcFS)) + }) + } +} diff --git a/staging/operator-registry/pkg/cache/pkgs.go b/staging/operator-registry/pkg/cache/pkgs.go new file mode 100644 index 0000000000..d387ddbd09 --- /dev/null +++ b/staging/operator-registry/pkg/cache/pkgs.go @@ -0,0 +1,297 @@ +package cache + +import ( + "context" + "fmt" + "sort" + + "github.com/operator-framework/operator-registry/alpha/declcfg" + "github.com/operator-framework/operator-registry/alpha/model" + "github.com/operator-framework/operator-registry/pkg/api" + "github.com/operator-framework/operator-registry/pkg/registry" +) + +type packageIndex map[string]cPkg + +func (pkgs packageIndex) ListPackages(_ context.Context) ([]string, error) { + var packages []string + for pkgName := range pkgs { + packages = append(packages, pkgName) + } + return packages, nil +} + +func (pkgs packageIndex) GetPackage(_ context.Context, name string) (*registry.PackageManifest, error) { + pkg, ok := pkgs[name] + if !ok { + return nil, fmt.Errorf("package %q not found", name) + } + + var channels []registry.PackageChannel + for _, ch := range pkg.Channels { + channels = append(channels, registry.PackageChannel{ + Name: ch.Name, + CurrentCSVName: ch.Head, + }) + } + return ®istry.PackageManifest{ + PackageName: pkg.Name, + Channels: channels, + DefaultChannelName: pkg.DefaultChannel, + }, nil +} + +func (pkgs packageIndex) GetChannelEntriesThatReplace(_ context.Context, name string) ([]*registry.ChannelEntry, error) { + var entries []*registry.ChannelEntry + + for _, pkg := range pkgs { + for _, ch := range pkg.Channels { + for _, b := range ch.Bundles { + entries = append(entries, channelEntriesThatReplace(b, name)...) + } + } + } + if len(entries) == 0 { + return nil, fmt.Errorf("no channel entries found that replace %s", name) + } + return entries, nil +} + +func (pkgs packageIndex) GetBundleForChannel(ctx context.Context, c Cache, pkgName string, channelName string) (*api.Bundle, error) { + pkg, ok := pkgs[pkgName] + if !ok { + return nil, fmt.Errorf("package %q not found", pkgName) + } + ch, ok := pkg.Channels[channelName] + if !ok { + return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) + } + return c.GetBundle(ctx, pkg.Name, ch.Name, ch.Head) +} + +func (pkgs packageIndex) GetBundleThatReplaces(ctx context.Context, c Cache, name, pkgName, channelName string) (*api.Bundle, error) { + pkg, ok := pkgs[pkgName] + if !ok { + return nil, fmt.Errorf("package %s not found", pkgName) + } + ch, ok := pkg.Channels[channelName] + if !ok { + return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) + } + + // NOTE: iterating over a map is non-deterministic in Go, so if multiple bundles replace this one, + // the bundle returned by this function is also non-deterministic. The sqlite implementation + // is ALSO non-deterministic because it doesn't use ORDER BY, so its probably okay for this + // implementation to be non-deterministic as well. + for _, b := range ch.Bundles { + if bundleReplaces(b, name) { + return c.GetBundle(ctx, pkg.Name, ch.Name, b.Name) + } + } + return nil, fmt.Errorf("no entry found for package %q, channel %q", pkgName, channelName) +} + +func (pkgs packageIndex) GetChannelEntriesThatProvide(ctx context.Context, c Cache, group, version, kind string) ([]*registry.ChannelEntry, error) { + var entries []*registry.ChannelEntry + + for _, pkg := range pkgs { + for _, ch := range pkg.Channels { + for _, b := range ch.Bundles { + provides, err := doesBundleProvide(ctx, c, b.Package, b.Channel, b.Name, group, version, kind) + if err != nil { + return nil, err + } + if provides { + // TODO(joelanford): It seems like the SQLite query returns + // invalid entries (i.e. where bundle `Replaces` isn't actually + // in channel `ChannelName`). Is that a bug? For now, this mimics + // the sqlite server and returns seemingly invalid channel entries. + // Don't worry about this. Not used anymore. + + entries = append(entries, pkgs.channelEntriesForBundle(b, true)...) + } + } + } + } + if len(entries) == 0 { + return nil, fmt.Errorf("no channel entries found that provide group:%q version:%q kind:%q", group, version, kind) + } + return entries, nil +} + +// TODO(joelanford): Need to review the expected functionality of this function. I ran +// +// some experiments with the sqlite version of this function and it seems to only return +// channel heads that provide the GVK (rather than searching down the graph if parent bundles +// don't provide the API). Based on that, this function currently looks at channel heads only. +// --- +// Separate, but possibly related, I noticed there are several channels in the channel entry +// table who's minimum depth is 1. What causes 1 to be minimum depth in some cases and 0 in others? +func (pkgs packageIndex) GetLatestChannelEntriesThatProvide(ctx context.Context, c Cache, group, version, kind string) ([]*registry.ChannelEntry, error) { + var entries []*registry.ChannelEntry + + for _, pkg := range pkgs { + for _, ch := range pkg.Channels { + b := ch.Bundles[ch.Head] + provides, err := doesBundleProvide(ctx, c, b.Package, b.Channel, b.Name, group, version, kind) + if err != nil { + return nil, err + } + if provides { + entries = append(entries, pkgs.channelEntriesForBundle(b, false)...) + } + } + } + if len(entries) == 0 { + return nil, fmt.Errorf("no channel entries found that provide group:%q version:%q kind:%q", group, version, kind) + } + return entries, nil +} + +func (pkgs packageIndex) GetBundleThatProvides(ctx context.Context, c Cache, group, version, kind string) (*api.Bundle, error) { + latestEntries, err := c.GetLatestChannelEntriesThatProvide(ctx, group, version, kind) + if err != nil { + return nil, err + } + + // It's possible for multiple packages to provide an API, but this function is forced to choose one. + // To do that deterministically, we'll pick the the bundle based on a lexicographical sort of its + // package name. + sort.Slice(latestEntries, func(i, j int) bool { + return latestEntries[i].PackageName < latestEntries[j].PackageName + }) + + for _, entry := range latestEntries { + pkg, ok := pkgs[entry.PackageName] + if !ok { + // This should never happen because the latest entries were + // collected based on iterating over the packages in q.packageIndex. + continue + } + if entry.ChannelName == pkg.DefaultChannel { + return c.GetBundle(ctx, entry.PackageName, entry.ChannelName, entry.BundleName) + } + } + return nil, fmt.Errorf("no entry found that provides group:%q version:%q kind:%q", group, version, kind) +} + +type cPkg struct { + Name string `json:"name"` + Description string `json:"description"` + Icon *declcfg.Icon `json:"icon"` + DefaultChannel string `json:"defaultChannel"` + Channels map[string]cChannel +} + +type cChannel struct { + Name string + Head string + Bundles map[string]cBundle +} + +type cBundle struct { + Package string `json:"package"` + Channel string `json:"channel"` + Name string `json:"name"` + Replaces string `json:"replaces"` + Skips []string `json:"skips"` +} + +func packagesFromModel(m model.Model) (map[string]cPkg, error) { + pkgs := map[string]cPkg{} + for _, p := range m { + newP := cPkg{ + Name: p.Name, + Description: p.Description, + DefaultChannel: p.DefaultChannel.Name, + Channels: map[string]cChannel{}, + } + if p.Icon != nil { + newP.Icon = &declcfg.Icon{ + Data: p.Icon.Data, + MediaType: p.Icon.MediaType, + } + } + for _, ch := range p.Channels { + head, err := ch.Head() + if err != nil { + return nil, err + } + newCh := cChannel{ + Name: ch.Name, + Head: head.Name, + Bundles: map[string]cBundle{}, + } + for _, b := range ch.Bundles { + newB := cBundle{ + Package: b.Package.Name, + Channel: b.Channel.Name, + Name: b.Name, + Replaces: b.Replaces, + Skips: b.Skips, + } + newCh.Bundles[b.Name] = newB + } + newP.Channels[ch.Name] = newCh + } + pkgs[p.Name] = newP + } + return pkgs, nil +} + +func bundleReplaces(b cBundle, name string) bool { + if b.Replaces == name { + return true + } + for _, s := range b.Skips { + if s == name { + return true + } + } + return false +} + +func channelEntriesThatReplace(b cBundle, name string) []*registry.ChannelEntry { + var entries []*registry.ChannelEntry + if b.Replaces == name { + entries = append(entries, ®istry.ChannelEntry{ + PackageName: b.Package, + ChannelName: b.Channel, + BundleName: b.Name, + Replaces: b.Replaces, + }) + } + for _, s := range b.Skips { + if s == name && s != b.Replaces { + entries = append(entries, ®istry.ChannelEntry{ + PackageName: b.Package, + ChannelName: b.Channel, + BundleName: b.Name, + Replaces: b.Replaces, + }) + } + } + return entries +} + +func (pkgs packageIndex) channelEntriesForBundle(b cBundle, ignoreChannel bool) []*registry.ChannelEntry { + entries := []*registry.ChannelEntry{{ + PackageName: b.Package, + ChannelName: b.Channel, + BundleName: b.Name, + Replaces: b.Replaces, + }} + for _, s := range b.Skips { + // Ignore skips that duplicate b.Replaces. Also, only add it if its + // in the same channel as b (or we're ignoring channel presence). + if _, inChannel := pkgs[b.Package].Channels[b.Channel].Bundles[s]; s != b.Replaces && (ignoreChannel || inChannel) { + entries = append(entries, ®istry.ChannelEntry{ + PackageName: b.Package, + ChannelName: b.Channel, + BundleName: b.Name, + Replaces: s, + }) + } + } + return entries +} diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/registry/syscall_unix.go b/staging/operator-registry/pkg/cache/syscall_unix.go similarity index 84% rename from vendor/github.com/operator-framework/operator-registry/pkg/registry/syscall_unix.go rename to staging/operator-registry/pkg/cache/syscall_unix.go index b1edcf59fd..93372adb4e 100644 --- a/vendor/github.com/operator-framework/operator-registry/pkg/registry/syscall_unix.go +++ b/staging/operator-registry/pkg/cache/syscall_unix.go @@ -1,7 +1,7 @@ //go:build !windows // +build !windows -package registry +package cache import "golang.org/x/sys/unix" diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/registry/syscall_windows.go b/staging/operator-registry/pkg/cache/syscall_windows.go similarity index 82% rename from vendor/github.com/operator-framework/operator-registry/pkg/registry/syscall_windows.go rename to staging/operator-registry/pkg/cache/syscall_windows.go index 525c656f1c..7ff5ad8e86 100644 --- a/vendor/github.com/operator-framework/operator-registry/pkg/registry/syscall_windows.go +++ b/staging/operator-registry/pkg/cache/syscall_windows.go @@ -1,6 +1,6 @@ //go:build windows // +build windows -package registry +package cache var umask = func(i int) int { return 0 } diff --git a/staging/operator-registry/pkg/registry/tar.go b/staging/operator-registry/pkg/cache/tar.go similarity index 98% rename from staging/operator-registry/pkg/registry/tar.go rename to staging/operator-registry/pkg/cache/tar.go index f62a15da85..b368e011e9 100644 --- a/staging/operator-registry/pkg/registry/tar.go +++ b/staging/operator-registry/pkg/cache/tar.go @@ -1,4 +1,4 @@ -package registry +package cache import ( "archive/tar" diff --git a/staging/operator-registry/pkg/cache/tar_test.go b/staging/operator-registry/pkg/cache/tar_test.go new file mode 100644 index 0000000000..fc3c68b976 --- /dev/null +++ b/staging/operator-registry/pkg/cache/tar_test.go @@ -0,0 +1,58 @@ +package cache + +import ( + "bytes" + "errors" + "fmt" + "hash/fnv" + "io/fs" + "testing" + "testing/fstest" + + "github.com/stretchr/testify/require" +) + +func Test_fsToTar(t *testing.T) { + type testCase struct { + name string + fsys func() fs.FS + expect func(*testing.T, []byte, error) + } + testCases := []testCase{ + { + name: "non-existent fs path", + fsys: func() fs.FS { + notExist, _ := fs.Sub(fstest.MapFS{}, "sub") + return notExist + }, + expect: func(t *testing.T, bytes []byte, err error) { + require.True(t, errors.Is(err, fs.ErrNotExist)) + }, + }, + { + // NOTE: The entire purpose of this test is to ensure that the fsToTar implementation + // is stable over time + // + // Therefore, DO NOT CHANGE the expected digest value here unless validFS also + // changes. + // + // If validFS needs to change DO NOT CHANGE the fsToTar implementation in the same + // pull request. + name: "stable hash output", + fsys: func() fs.FS { return validFS }, + expect: func(t *testing.T, i []byte, err error) { + require.NoError(t, err) + hasher := fnv.New64a() + hasher.Write(i) + require.Equal(t, "6f9eec5b366c557f", fmt.Sprintf("%x", hasher.Sum(nil))) + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + w := bytes.Buffer{} + err := fsToTar(&w, tc.fsys()) + tc.expect(t, w.Bytes(), err) + }) + } +} diff --git a/staging/operator-registry/pkg/lib/registry/registry_test.go b/staging/operator-registry/pkg/lib/registry/registry_test.go index e55614ea25..4b74a671b8 100644 --- a/staging/operator-registry/pkg/lib/registry/registry_test.go +++ b/staging/operator-registry/pkg/lib/registry/registry_test.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "io/ioutil" "math/rand" "os" @@ -14,13 +15,16 @@ import ( "testing/fstest" "time" + "github.com/blang/semver/v4" "github.com/stretchr/testify/require" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "sigs.k8s.io/yaml" + "github.com/operator-framework/operator-registry/alpha/declcfg" "github.com/operator-framework/operator-registry/alpha/model" "github.com/operator-framework/operator-registry/alpha/property" + "github.com/operator-framework/operator-registry/pkg/cache" "github.com/operator-framework/operator-registry/pkg/image" "github.com/operator-framework/operator-registry/pkg/lib/bundle" "github.com/operator-framework/operator-registry/pkg/registry" @@ -31,7 +35,7 @@ func fakeBundlePathFromName(name string) string { return fmt.Sprintf("%s-path", name) } -func newQuerier(t *testing.T, bundles []*model.Bundle) *registry.Querier { +func newCache(t *testing.T, bundles []*model.Bundle) cache.Cache { t.Helper() pkgs := map[string]*model.Package{} channels := map[string]map[string]*model.Channel{} @@ -77,7 +81,7 @@ func newQuerier(t *testing.T, bundles []*model.Bundle) *registry.Querier { if !pkgPropertyFound { pkgJson, _ := json.Marshal(property.Package{ PackageName: b.Package.Name, - Version: b.Name, + Version: b.Version.String(), }) b.Properties = append(b.Properties, property.Property{ Type: property.TypePackage, @@ -85,8 +89,22 @@ func newQuerier(t *testing.T, bundles []*model.Bundle) *registry.Querier { }) } } - reg, err := registry.NewQuerier(pkgs) + fbc := declcfg.ConvertFromModel(pkgs) + + fbcDir := t.TempDir() + cacheDir := t.TempDir() + + fbcFile, err := os.Create(filepath.Join(fbcDir, "catalog.json")) + require.NoError(t, err) + require.NoError(t, declcfg.WriteJSON(fbc, fbcFile)) + require.NoError(t, fbcFile.Close()) + + reg, err := cache.New(cacheDir) require.NoError(t, err) + + require.NoError(t, reg.Build(os.DirFS(fbcDir))) + require.NoError(t, reg.Load()) + return reg } @@ -105,16 +123,18 @@ func TestCheckForBundlePaths(t *testing.T) { }{ { description: "BundleListPresent", - querier: newQuerier(t, []*model.Bundle{ + querier: newCache(t, []*model.Bundle{ { Package: &model.Package{Name: "pkg-0"}, Channel: &model.Channel{Name: "stable"}, Name: "csv-a", + Version: semver.MustParse("1.0.0"), }, { Package: &model.Package{Name: "pkg-0"}, Channel: &model.Channel{Name: "alpha"}, Name: "csv-b", + Version: semver.MustParse("2.0.0"), }, }), checkPaths: []string{ @@ -128,16 +148,18 @@ func TestCheckForBundlePaths(t *testing.T) { }, { description: "BundleListPartiallyMissing", - querier: newQuerier(t, []*model.Bundle{ + querier: newCache(t, []*model.Bundle{ { Package: &model.Package{Name: "pkg-0"}, Channel: &model.Channel{Name: "stable"}, Name: "csv-a", + Version: semver.MustParse("1.0.0"), }, { Package: &model.Package{Name: "pkg-0"}, Channel: &model.Channel{Name: "alpha"}, Name: "csv-b", + Version: semver.MustParse("2.0.0"), }, }), checkPaths: []string{ @@ -152,7 +174,7 @@ func TestCheckForBundlePaths(t *testing.T) { }, { description: "EmptyRegistry", - querier: newQuerier(t, nil), + querier: newCache(t, nil), checkPaths: []string{ fakeBundlePathFromName("missing"), }, @@ -163,11 +185,12 @@ func TestCheckForBundlePaths(t *testing.T) { }, { description: "EmptyDeprecateList", - querier: newQuerier(t, []*model.Bundle{ + querier: newCache(t, []*model.Bundle{ { Package: &model.Package{Name: "pkg-0"}, Channel: &model.Channel{Name: "stable"}, Name: "csv-a", + Version: semver.MustParse("1.0.0"), }, }), checkPaths: []string{}, @@ -191,7 +214,7 @@ func TestCheckForBundlePaths(t *testing.T) { for _, tt := range tests { t.Run(tt.description, func(t *testing.T) { found, missing, err := checkForBundlePaths(tt.querier, tt.checkPaths) - if qc, ok := tt.querier.(*registry.Querier); ok { + if qc, ok := tt.querier.(io.Closer); ok { defer qc.Close() } if tt.expected.err != nil { diff --git a/staging/operator-registry/pkg/registry/query.go b/staging/operator-registry/pkg/registry/query.go deleted file mode 100644 index 4c4212217f..0000000000 --- a/staging/operator-registry/pkg/registry/query.go +++ /dev/null @@ -1,661 +0,0 @@ -package registry - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "hash/fnv" - "io/fs" - "os" - "path/filepath" - "sort" - "strings" - - "github.com/operator-framework/operator-registry/alpha/declcfg" - "github.com/operator-framework/operator-registry/alpha/model" - "github.com/operator-framework/operator-registry/pkg/api" -) - -const ( - cachePermissionDir = 0750 - cachePermissionFile = 0640 -) - -type Querier struct { - *cache -} - -func (q Querier) Close() error { - return q.cache.close() -} - -type apiBundleKey struct { - pkgName string - chName string - name string -} - -type SliceBundleSender []*api.Bundle - -func (s *SliceBundleSender) Send(b *api.Bundle) error { - - *s = append(*s, b) - return nil -} - -var _ GRPCQuery = &Querier{} - -func NewQuerierFromFS(fbcFS fs.FS, cacheDir string) (*Querier, error) { - q := &Querier{} - var err error - q.cache, err = newCache(cacheDir, &fbcCacheModel{ - FBC: fbcFS, - Cache: os.DirFS(cacheDir), - }) - if err != nil { - return q, err - } - return q, nil -} - -func NewQuerier(m model.Model) (*Querier, error) { - q := &Querier{} - var err error - q.cache, err = newCache("", &nonDigestableModel{Model: m}) - if err != nil { - return q, err - } - return q, nil -} - -func (q Querier) loadAPIBundle(k apiBundleKey) (*api.Bundle, error) { - filename, ok := q.apiBundles[k] - if !ok { - return nil, fmt.Errorf("package %q, channel %q, bundle %q not found", k.pkgName, k.chName, k.name) - } - d, err := os.ReadFile(filename) - if err != nil { - return nil, err - } - var b api.Bundle - if err := json.Unmarshal(d, &b); err != nil { - return nil, err - } - return &b, nil -} - -func (q Querier) ListPackages(_ context.Context) ([]string, error) { - var packages []string - for pkgName := range q.pkgs { - packages = append(packages, pkgName) - } - return packages, nil -} - -func (q Querier) ListBundles(ctx context.Context) ([]*api.Bundle, error) { - var bundleSender SliceBundleSender - - err := q.SendBundles(ctx, &bundleSender) - if err != nil { - return nil, err - } - - return bundleSender, nil -} - -func (q Querier) SendBundles(_ context.Context, s BundleSender) error { - for _, pkg := range q.pkgs { - for _, ch := range pkg.Channels { - for _, b := range ch.Bundles { - apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, b.Name}) - if err != nil { - return fmt.Errorf("convert bundle %q: %v", b.Name, err) - } - if apiBundle.BundlePath != "" { - // The SQLite-based server - // configures its querier to - // omit these fields when - // bundle path is set. - apiBundle.CsvJson = "" - apiBundle.Object = nil - } - if err := s.Send(apiBundle); err != nil { - return err - } - } - } - } - return nil -} - -func (q Querier) GetPackage(_ context.Context, name string) (*PackageManifest, error) { - pkg, ok := q.pkgs[name] - if !ok { - return nil, fmt.Errorf("package %q not found", name) - } - - var channels []PackageChannel - for _, ch := range pkg.Channels { - channels = append(channels, PackageChannel{ - Name: ch.Name, - CurrentCSVName: ch.Head, - }) - } - return &PackageManifest{ - PackageName: pkg.Name, - Channels: channels, - DefaultChannelName: pkg.DefaultChannel, - }, nil -} - -func (q Querier) GetBundle(_ context.Context, pkgName, channelName, csvName string) (*api.Bundle, error) { - pkg, ok := q.pkgs[pkgName] - if !ok { - return nil, fmt.Errorf("package %q not found", pkgName) - } - ch, ok := pkg.Channels[channelName] - if !ok { - return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) - } - b, ok := ch.Bundles[csvName] - if !ok { - return nil, fmt.Errorf("package %q, channel %q, bundle %q not found", pkgName, channelName, csvName) - } - apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, b.Name}) - if err != nil { - return nil, fmt.Errorf("convert bundle %q: %v", b.Name, err) - } - - // unset Replaces and Skips (sqlite query does not populate these fields) - apiBundle.Replaces = "" - apiBundle.Skips = nil - return apiBundle, nil -} - -func (q Querier) GetBundleForChannel(_ context.Context, pkgName string, channelName string) (*api.Bundle, error) { - pkg, ok := q.pkgs[pkgName] - if !ok { - return nil, fmt.Errorf("package %q not found", pkgName) - } - ch, ok := pkg.Channels[channelName] - if !ok { - return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) - } - apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, ch.Head}) - if err != nil { - return nil, fmt.Errorf("convert bundle %q: %v", ch.Head, err) - } - - // unset Replaces and Skips (sqlite query does not populate these fields) - apiBundle.Replaces = "" - apiBundle.Skips = nil - return apiBundle, nil -} - -func (q Querier) GetChannelEntriesThatReplace(_ context.Context, name string) ([]*ChannelEntry, error) { - var entries []*ChannelEntry - - for _, pkg := range q.pkgs { - for _, ch := range pkg.Channels { - for _, b := range ch.Bundles { - entries = append(entries, channelEntriesThatReplace(b, name)...) - } - } - } - if len(entries) == 0 { - return nil, fmt.Errorf("no channel entries found that replace %s", name) - } - return entries, nil -} - -func (q Querier) GetBundleThatReplaces(_ context.Context, name, pkgName, channelName string) (*api.Bundle, error) { - pkg, ok := q.pkgs[pkgName] - if !ok { - return nil, fmt.Errorf("package %s not found", pkgName) - } - ch, ok := pkg.Channels[channelName] - if !ok { - return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) - } - - // NOTE: iterating over a map is non-deterministic in Go, so if multiple bundles replace this one, - // the bundle returned by this function is also non-deterministic. The sqlite implementation - // is ALSO non-deterministic because it doesn't use ORDER BY, so its probably okay for this - // implementation to be non-deterministic as well. - for _, b := range ch.Bundles { - if bundleReplaces(b, name) { - apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, b.Name}) - if err != nil { - return nil, fmt.Errorf("convert bundle %q: %v", b.Name, err) - } - - // unset Replaces and Skips (sqlite query does not populate these fields) - apiBundle.Replaces = "" - apiBundle.Skips = nil - return apiBundle, nil - } - } - return nil, fmt.Errorf("no entry found for package %q, channel %q", pkgName, channelName) -} - -func (q Querier) GetChannelEntriesThatProvide(_ context.Context, group, version, kind string) ([]*ChannelEntry, error) { - var entries []*ChannelEntry - - for _, pkg := range q.pkgs { - for _, ch := range pkg.Channels { - for _, b := range ch.Bundles { - provides, err := q.doesModelBundleProvide(b, group, version, kind) - if err != nil { - return nil, err - } - if provides { - // TODO(joelanford): It seems like the SQLite query returns - // invalid entries (i.e. where bundle `Replaces` isn't actually - // in channel `ChannelName`). Is that a bug? For now, this mimics - // the sqlite server and returns seemingly invalid channel entries. - // Don't worry about this. Not used anymore. - - entries = append(entries, q.channelEntriesForBundle(b, true)...) - } - } - } - } - if len(entries) == 0 { - return nil, fmt.Errorf("no channel entries found that provide group:%q version:%q kind:%q", group, version, kind) - } - return entries, nil -} - -// TODO(joelanford): Need to review the expected functionality of this function. I ran -// some experiments with the sqlite version of this function and it seems to only return -// channel heads that provide the GVK (rather than searching down the graph if parent bundles -// don't provide the API). Based on that, this function currently looks at channel heads only. -// --- -// Separate, but possibly related, I noticed there are several channels in the channel entry -// table who's minimum depth is 1. What causes 1 to be minimum depth in some cases and 0 in others? -func (q Querier) GetLatestChannelEntriesThatProvide(_ context.Context, group, version, kind string) ([]*ChannelEntry, error) { - var entries []*ChannelEntry - - for _, pkg := range q.pkgs { - for _, ch := range pkg.Channels { - b := ch.Bundles[ch.Head] - provides, err := q.doesModelBundleProvide(b, group, version, kind) - if err != nil { - return nil, err - } - if provides { - entries = append(entries, q.channelEntriesForBundle(b, false)...) - } - } - } - if len(entries) == 0 { - return nil, fmt.Errorf("no channel entries found that provide group:%q version:%q kind:%q", group, version, kind) - } - return entries, nil -} - -func (q Querier) GetBundleThatProvides(ctx context.Context, group, version, kind string) (*api.Bundle, error) { - latestEntries, err := q.GetLatestChannelEntriesThatProvide(ctx, group, version, kind) - if err != nil { - return nil, err - } - - // It's possible for multiple packages to provide an API, but this function is forced to choose one. - // To do that deterministically, we'll pick the the bundle based on a lexicographical sort of its - // package name. - sort.Slice(latestEntries, func(i, j int) bool { - return latestEntries[i].PackageName < latestEntries[j].PackageName - }) - - for _, entry := range latestEntries { - pkg, ok := q.pkgs[entry.PackageName] - if !ok { - // This should never happen because the latest entries were - // collected based on iterating over the packages in q.pkgs. - continue - } - if entry.ChannelName == pkg.DefaultChannel { - return q.GetBundle(ctx, entry.PackageName, entry.ChannelName, entry.BundleName) - } - } - return nil, fmt.Errorf("no entry found that provides group:%q version:%q kind:%q", group, version, kind) -} - -func (q Querier) doesModelBundleProvide(b cBundle, group, version, kind string) (bool, error) { - apiBundle, err := q.loadAPIBundle(apiBundleKey{b.Package, b.Channel, b.Name}) - if err != nil { - return false, fmt.Errorf("convert bundle %q: %v", b.Name, err) - } - for _, gvk := range apiBundle.ProvidedApis { - if gvk.Group == group && gvk.Version == version && gvk.Kind == kind { - return true, nil - } - } - return false, nil -} - -func bundleReplaces(b cBundle, name string) bool { - if b.Replaces == name { - return true - } - for _, s := range b.Skips { - if s == name { - return true - } - } - return false -} - -func channelEntriesThatReplace(b cBundle, name string) []*ChannelEntry { - var entries []*ChannelEntry - if b.Replaces == name { - entries = append(entries, &ChannelEntry{ - PackageName: b.Package, - ChannelName: b.Channel, - BundleName: b.Name, - Replaces: b.Replaces, - }) - } - for _, s := range b.Skips { - if s == name && s != b.Replaces { - entries = append(entries, &ChannelEntry{ - PackageName: b.Package, - ChannelName: b.Channel, - BundleName: b.Name, - Replaces: b.Replaces, - }) - } - } - return entries -} - -func (q Querier) channelEntriesForBundle(b cBundle, ignoreChannel bool) []*ChannelEntry { - entries := []*ChannelEntry{{ - PackageName: b.Package, - ChannelName: b.Channel, - BundleName: b.Name, - Replaces: b.Replaces, - }} - for _, s := range b.Skips { - // Ignore skips that duplicate b.Replaces. Also, only add it if its - // in the same channel as b (or we're ignoring channel presence). - if _, inChannel := q.pkgs[b.Package].Channels[b.Channel].Bundles[s]; s != b.Replaces && (ignoreChannel || inChannel) { - entries = append(entries, &ChannelEntry{ - PackageName: b.Package, - ChannelName: b.Channel, - BundleName: b.Name, - Replaces: s, - }) - } - } - return entries -} - -type cache struct { - digest string - baseDir string - persist bool - pkgs map[string]cPkg - apiBundles map[apiBundleKey]string -} - -func newCache(baseDir string, model digestableModel) (*cache, error) { - var ( - qc *cache - err error - ) - if baseDir == "" { - qc, err = newEphemeralCache() - } else { - qc, err = newPersistentCache(baseDir) - } - if err != nil { - return nil, err - } - return qc, qc.load(model) -} - -func (qc cache) close() error { - if qc.persist { - return nil - } - return os.RemoveAll(qc.baseDir) -} - -func newEphemeralCache() (*cache, error) { - baseDir, err := os.MkdirTemp("", "opm-serve-cache-") - if err != nil { - return nil, err - } - if err := os.MkdirAll(filepath.Join(baseDir, "cache"), cachePermissionDir); err != nil { - return nil, err - } - return &cache{ - digest: "", - baseDir: baseDir, - persist: false, - }, nil -} - -func newPersistentCache(baseDir string) (*cache, error) { - if err := os.MkdirAll(baseDir, cachePermissionDir); err != nil { - return nil, err - } - qc := &cache{baseDir: baseDir, persist: true} - if digest, err := os.ReadFile(filepath.Join(baseDir, "digest")); err == nil { - qc.digest = strings.TrimSpace(string(digest)) - } - return qc, nil -} - -func (qc *cache) load(model digestableModel) error { - computedDigest, err := model.GetDigest() - if err != nil && !errors.Is(err, errNonDigestable) { - return fmt.Errorf("compute digest: %v", err) - } - if err == nil && computedDigest == qc.digest { - err = qc.loadFromCache() - if err == nil { - return nil - } - // if there _was_ an error loading from the cache, - // we'll drop down and repopulate from scratch. - } - return qc.repopulateCache(model) -} - -func (qc *cache) loadFromCache() error { - packagesData, err := os.ReadFile(filepath.Join(qc.baseDir, "cache", "packages.json")) - if err != nil { - return err - } - if err := json.Unmarshal(packagesData, &qc.pkgs); err != nil { - return err - } - qc.apiBundles = map[apiBundleKey]string{} - for _, p := range qc.pkgs { - for _, ch := range p.Channels { - for _, b := range ch.Bundles { - filename := filepath.Join(qc.baseDir, "cache", fmt.Sprintf("%s_%s_%s.json", p.Name, ch.Name, b.Name)) - qc.apiBundles[apiBundleKey{pkgName: p.Name, chName: ch.Name, name: b.Name}] = filename - } - } - } - return nil -} - -func (qc *cache) repopulateCache(model digestableModel) error { - // ensure that generated cache is available to all future users - oldUmask := umask(000) - defer umask(oldUmask) - - m, err := model.GetModel() - if err != nil { - return err - } - cacheDirEntries, err := os.ReadDir(qc.baseDir) - if err != nil { - return err - } - for _, e := range cacheDirEntries { - if err := os.RemoveAll(filepath.Join(qc.baseDir, e.Name())); err != nil { - return err - } - } - if err := os.MkdirAll(filepath.Join(qc.baseDir, "cache"), cachePermissionDir); err != nil { - return err - } - - qc.pkgs, err = packagesFromModel(m) - if err != nil { - return err - } - - packageJson, err := json.Marshal(qc.pkgs) - if err != nil { - return err - } - if err := os.WriteFile(filepath.Join(qc.baseDir, "cache", "packages.json"), packageJson, cachePermissionFile); err != nil { - return err - } - - qc.apiBundles = map[apiBundleKey]string{} - for _, p := range m { - for _, ch := range p.Channels { - for _, b := range ch.Bundles { - apiBundle, err := api.ConvertModelBundleToAPIBundle(*b) - if err != nil { - return err - } - jsonBundle, err := json.Marshal(apiBundle) - if err != nil { - return err - } - filename := filepath.Join(qc.baseDir, "cache", fmt.Sprintf("%s_%s_%s.json", p.Name, ch.Name, b.Name)) - if err := os.WriteFile(filename, jsonBundle, cachePermissionFile); err != nil { - return err - } - qc.apiBundles[apiBundleKey{p.Name, ch.Name, b.Name}] = filename - } - } - } - computedHash, err := model.GetDigest() - if err == nil { - if err := os.WriteFile(filepath.Join(qc.baseDir, "digest"), []byte(computedHash), cachePermissionFile); err != nil { - return err - } - } else if !errors.Is(err, errNonDigestable) { - return fmt.Errorf("compute digest: %v", err) - } - return nil -} - -func packagesFromModel(m model.Model) (map[string]cPkg, error) { - pkgs := map[string]cPkg{} - for _, p := range m { - newP := cPkg{ - Name: p.Name, - Description: p.Description, - DefaultChannel: p.DefaultChannel.Name, - Channels: map[string]cChannel{}, - } - if p.Icon != nil { - newP.Icon = &declcfg.Icon{ - Data: p.Icon.Data, - MediaType: p.Icon.MediaType, - } - } - for _, ch := range p.Channels { - head, err := ch.Head() - if err != nil { - return nil, err - } - newCh := cChannel{ - Name: ch.Name, - Head: head.Name, - Bundles: map[string]cBundle{}, - } - for _, b := range ch.Bundles { - newB := cBundle{ - Package: b.Package.Name, - Channel: b.Channel.Name, - Name: b.Name, - Replaces: b.Replaces, - Skips: b.Skips, - } - newCh.Bundles[b.Name] = newB - } - newP.Channels[ch.Name] = newCh - } - pkgs[p.Name] = newP - } - return pkgs, nil -} - -type cPkg struct { - Name string `json:"name"` - Description string `json:"description"` - Icon *declcfg.Icon `json:"icon"` - DefaultChannel string `json:"defaultChannel"` - Channels map[string]cChannel -} - -type cChannel struct { - Name string - Head string - Bundles map[string]cBundle -} - -type cBundle struct { - Package string `json:"package"` - Channel string `json:"channel"` - Name string `json:"name"` - Replaces string `json:"replaces"` - Skips []string `json:"skips"` -} - -type digestableModel interface { - GetModel() (model.Model, error) - GetDigest() (string, error) -} - -type fbcCacheModel struct { - FBC fs.FS - Cache fs.FS -} - -func (m *fbcCacheModel) GetModel() (model.Model, error) { - fbc, err := declcfg.LoadFS(m.FBC) - if err != nil { - return nil, err - } - return declcfg.ConvertToModel(*fbc) -} - -func (m *fbcCacheModel) GetDigest() (string, error) { - computedHasher := fnv.New64a() - if err := fsToTar(computedHasher, m.FBC); err != nil { - return "", err - } - if cacheFS, err := fs.Sub(m.Cache, "cache"); err == nil { - if err := fsToTar(computedHasher, cacheFS); err != nil && !errors.Is(err, os.ErrNotExist) { - return "", fmt.Errorf("compute hash: %v", err) - } - } - return fmt.Sprintf("%x", computedHasher.Sum(nil)), nil -} - -var errNonDigestable = errors.New("cannot generate digest") - -type nonDigestableModel struct { - model.Model -} - -func (m *nonDigestableModel) GetModel() (model.Model, error) { - return m.Model, nil -} - -func (m *nonDigestableModel) GetDigest() (string, error) { - return "", errNonDigestable -} diff --git a/staging/operator-registry/pkg/server/server_test.go b/staging/operator-registry/pkg/server/server_test.go index 09117ee844..a3f001d123 100644 --- a/staging/operator-registry/pkg/server/server_test.go +++ b/staging/operator-registry/pkg/server/server_test.go @@ -2,7 +2,6 @@ package server import ( "io" - "io/ioutil" "net" "os" "path/filepath" @@ -17,7 +16,10 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/connectivity" + "github.com/operator-framework/operator-registry/alpha/action" + "github.com/operator-framework/operator-registry/alpha/declcfg" "github.com/operator-framework/operator-registry/pkg/api" + cache2 "github.com/operator-framework/operator-registry/pkg/cache" "github.com/operator-framework/operator-registry/pkg/registry" "github.com/operator-framework/operator-registry/pkg/sqlite" ) @@ -27,13 +29,11 @@ const ( dbAddress = "localhost" + dbPort dbName = "test.db" - cfgPort = ":50053" - cfgAddress = "localhost" + cfgPort + jsonCachePort = ":50053" + jsonCacheAddress = "localhost" + jsonCachePort ) -func dbStore(dbPath string) *sqlite.SQLQuerier { - _ = os.Remove(dbPath) - +func createDBStore(dbPath string) *sqlite.SQLQuerier { db, err := sqlite.Open(dbPath) if err != nil { logrus.Fatal(err) @@ -60,22 +60,12 @@ func dbStore(dbPath string) *sqlite.SQLQuerier { return store } -func cfgStore() (*registry.Querier, error) { - tmpDir, err := ioutil.TempDir("", "server_test-") - if err != nil { +func fbcJsonCache(catalogDir, cacheDir string) (cache2.Cache, error) { + store := cache2.NewJSON(cacheDir) + if err := store.Build(os.DirFS(catalogDir)); err != nil { return nil, err } - defer os.RemoveAll(tmpDir) - - dbFile := filepath.Join(tmpDir, "test.db") - - dbStore := dbStore(dbFile) - m, err := sqlite.ToModel(context.TODO(), dbStore) - if err != nil { - return nil, err - } - store, err := registry.NewQuerier(m) - if err != nil { + if err := store.Load(); err != nil { return nil, err } return store, nil @@ -88,14 +78,37 @@ func server(store registry.GRPCQuery) *grpc.Server { } func TestMain(m *testing.M) { - s1 := server(dbStore(dbName)) + tmpDir, err := os.MkdirTemp("", "operator-registry-server-test-") + if err != nil { + logrus.Fatal(err) + } + defer func() { + if err := os.RemoveAll(tmpDir); err != nil { + logrus.Fatalf("couldn't remove test directory: %v", err) + } + }() + + dbFile := filepath.Join(tmpDir, "test.db") + dbStore := createDBStore(dbFile) + + fbcDir := filepath.Join(tmpDir, "fbc") + fbcMigrate := action.Migrate{ + CatalogRef: dbFile, + OutputDir: fbcDir, + WriteFunc: declcfg.WriteJSON, + FileExt: ".json", + } + if err := fbcMigrate.Run(context.TODO()); err != nil { + logrus.Fatal(err) + } - cfgQuerier, err := cfgStore() - defer cfgQuerier.Close() + s1 := server(dbStore) + + fbcJsonStore, err := fbcJsonCache(fbcDir, filepath.Join(tmpDir, "json-cache")) if err != nil { - logrus.Fatalf("failed to create fbc querier: %v", err) + logrus.Fatalf("failed to create json cache: %v", err) } - s2 := server(cfgQuerier) + s2 := server(fbcJsonStore) go func() { lis, err := net.Listen("tcp", dbPort) if err != nil { @@ -106,18 +119,15 @@ func TestMain(m *testing.M) { } }() go func() { - lis, err := net.Listen("tcp", cfgPort) + lis, err := net.Listen("tcp", jsonCachePort) if err != nil { logrus.Fatalf("failed to listen: %v", err) } if err := s2.Serve(lis); err != nil { - logrus.Fatalf("failed to serve configs: %v", err) + logrus.Fatalf("failed to serve fbc json cache: %v", err) } }() exit := m.Run() - if err := os.Remove(dbName); err != nil { - logrus.Fatalf("couldn't remove db") - } os.Exit(exit) } @@ -135,7 +145,7 @@ func client(t *testing.T, address string) (api.RegistryClient, *grpc.ClientConn) func TestListPackages(t *testing.T) { t.Run("Sqlite", testListPackages(dbAddress)) - t.Run("DeclarativeConfig", testListPackages(cfgAddress)) + t.Run("FBCJsonCache", testListPackages(jsonCacheAddress)) } func testListPackages(addr string) func(*testing.T) { @@ -167,7 +177,7 @@ func testListPackages(addr string) func(*testing.T) { func TestGetPackage(t *testing.T) { t.Run("Sqlite", testGetPackage(dbAddress)) - t.Run("DeclarativeConfig", testGetPackage(cfgAddress)) + t.Run("FBCJsonCache", testGetPackage(jsonCacheAddress)) } func testGetPackage(addr string) func(*testing.T) { @@ -208,7 +218,7 @@ func testGetPackage(addr string) func(*testing.T) { func TestGetBundle(t *testing.T) { t.Run("Sqlite", testGetBundle(dbAddress, etcdoperator_v0_9_2("alpha", false, false))) - t.Run("DeclarativeConfig", testGetBundle(cfgAddress, etcdoperator_v0_9_2("alpha", false, true))) + t.Run("FBCJsonCache", testGetBundle(jsonCacheAddress, etcdoperator_v0_9_2("alpha", false, true))) } func testGetBundle(addr string, expected *api.Bundle) func(*testing.T) { @@ -231,7 +241,7 @@ func TestGetBundleForChannel(t *testing.T) { CsvJson: b.CsvJson + "\n", })) } - t.Run("DeclarativeConfig", testGetBundleForChannel(cfgAddress, etcdoperator_v0_9_2("alpha", false, true))) + t.Run("FBCJsonCache", testGetBundleForChannel(jsonCacheAddress, etcdoperator_v0_9_2("alpha", false, true))) } func testGetBundleForChannel(addr string, expected *api.Bundle) func(*testing.T) { @@ -247,7 +257,7 @@ func testGetBundleForChannel(addr string, expected *api.Bundle) func(*testing.T) func TestGetChannelEntriesThatReplace(t *testing.T) { t.Run("Sqlite", testGetChannelEntriesThatReplace(dbAddress)) - t.Run("DeclarativeConfig", testGetChannelEntriesThatReplace(cfgAddress)) + t.Run("FBCJsonCache", testGetChannelEntriesThatReplace(jsonCacheAddress)) } func testGetChannelEntriesThatReplace(addr string) func(*testing.T) { @@ -323,7 +333,7 @@ func testGetChannelEntriesThatReplace(addr string) func(*testing.T) { func TestGetBundleThatReplaces(t *testing.T) { t.Run("Sqlite", testGetBundleThatReplaces(dbAddress, etcdoperator_v0_9_2("alpha", false, false))) - t.Run("DeclarativeConfig", testGetBundleThatReplaces(cfgAddress, etcdoperator_v0_9_2("alpha", false, true))) + t.Run("FBCJsonCache", testGetBundleThatReplaces(jsonCacheAddress, etcdoperator_v0_9_2("alpha", false, true))) } func testGetBundleThatReplaces(addr string, expected *api.Bundle) func(*testing.T) { @@ -339,7 +349,7 @@ func testGetBundleThatReplaces(addr string, expected *api.Bundle) func(*testing. func TestGetBundleThatReplacesSynthetic(t *testing.T) { t.Run("Sqlite", testGetBundleThatReplacesSynthetic(dbAddress, etcdoperator_v0_9_2("alpha", false, false))) - t.Run("DeclarativeConfig", testGetBundleThatReplacesSynthetic(cfgAddress, etcdoperator_v0_9_2("alpha", false, true))) + t.Run("FBCJsonCache", testGetBundleThatReplacesSynthetic(jsonCacheAddress, etcdoperator_v0_9_2("alpha", false, true))) } func testGetBundleThatReplacesSynthetic(addr string, expected *api.Bundle) func(*testing.T) { @@ -356,7 +366,7 @@ func testGetBundleThatReplacesSynthetic(addr string, expected *api.Bundle) func( func TestGetChannelEntriesThatProvide(t *testing.T) { t.Run("Sqlite", testGetChannelEntriesThatProvide(dbAddress)) - t.Run("DeclarativeConfig", testGetChannelEntriesThatProvide(cfgAddress)) + t.Run("FBCJsonCache", testGetChannelEntriesThatProvide(jsonCacheAddress)) } func testGetChannelEntriesThatProvide(addr string) func(t *testing.T) { @@ -473,7 +483,7 @@ func testGetChannelEntriesThatProvide(addr string) func(t *testing.T) { func TestGetLatestChannelEntriesThatProvide(t *testing.T) { t.Run("Sqlite", testGetLatestChannelEntriesThatProvide(dbAddress)) - t.Run("DeclarativeConfig", testGetLatestChannelEntriesThatProvide(cfgAddress)) + t.Run("FBCJsonCache", testGetLatestChannelEntriesThatProvide(jsonCacheAddress)) } func testGetLatestChannelEntriesThatProvide(addr string) func(t *testing.T) { @@ -549,7 +559,7 @@ func testGetLatestChannelEntriesThatProvide(addr string) func(t *testing.T) { func TestGetDefaultBundleThatProvides(t *testing.T) { t.Run("Sqlite", testGetDefaultBundleThatProvides(dbAddress, etcdoperator_v0_9_2("alpha", false, false))) - t.Run("DeclarativeConfig", testGetDefaultBundleThatProvides(cfgAddress, etcdoperator_v0_9_2("alpha", false, true))) + t.Run("FBCJsonCache", testGetDefaultBundleThatProvides(jsonCacheAddress, etcdoperator_v0_9_2("alpha", false, true))) } func testGetDefaultBundleThatProvides(addr string, expected *api.Bundle) func(*testing.T) { @@ -567,7 +577,7 @@ func TestListBundles(t *testing.T) { t.Run("Sqlite", testListBundles(dbAddress, etcdoperator_v0_9_2("alpha", true, false), etcdoperator_v0_9_2("stable", true, false))) - t.Run("DeclarativeConfig", testListBundles(cfgAddress, + t.Run("FBCJsonCache", testListBundles(jsonCacheAddress, etcdoperator_v0_9_2("alpha", true, true), etcdoperator_v0_9_2("stable", true, true))) } diff --git a/staging/operator-registry/pkg/sqlite/query.go b/staging/operator-registry/pkg/sqlite/query.go index 6d46d544c2..24880f1fca 100644 --- a/staging/operator-registry/pkg/sqlite/query.go +++ b/staging/operator-registry/pkg/sqlite/query.go @@ -1065,8 +1065,15 @@ func (s *SQLQuerier) SendBundles(ctx context.Context, stream registry.BundleSend return nil } +type sliceBundleSender []*api.Bundle + +func (s *sliceBundleSender) Send(b *api.Bundle) error { + *s = append(*s, b) + return nil +} + func (s *SQLQuerier) ListBundles(ctx context.Context) ([]*api.Bundle, error) { - var bundleSender registry.SliceBundleSender + var bundleSender sliceBundleSender err := s.SendBundles(ctx, &bundleSender) if err != nil { return nil, err diff --git a/vendor/github.com/operator-framework/operator-registry/cmd/opm/serve/serve.go b/vendor/github.com/operator-framework/operator-registry/cmd/opm/serve/serve.go index 633ad9ef3f..8b7d280764 100644 --- a/vendor/github.com/operator-framework/operator-registry/cmd/opm/serve/serve.go +++ b/vendor/github.com/operator-framework/operator-registry/cmd/opm/serve/serve.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "io" "net" "net/http" endpoint "net/http/pprof" @@ -19,17 +20,18 @@ import ( "github.com/operator-framework/operator-registry/pkg/api" health "github.com/operator-framework/operator-registry/pkg/api/grpc_health_v1" + "github.com/operator-framework/operator-registry/pkg/cache" "github.com/operator-framework/operator-registry/pkg/lib/dns" "github.com/operator-framework/operator-registry/pkg/lib/graceful" "github.com/operator-framework/operator-registry/pkg/lib/log" - "github.com/operator-framework/operator-registry/pkg/registry" "github.com/operator-framework/operator-registry/pkg/server" ) type serve struct { - configDir string - cacheDir string - cacheOnly bool + configDir string + cacheDir string + cacheOnly bool + cacheEnforceIntegrity bool port string terminationLog string @@ -59,15 +61,19 @@ startup. Changes made to the declarative config after the this command starts will not be reflected in the served content. `, Args: cobra.ExactArgs(1), - PreRunE: func(_ *cobra.Command, args []string) error { + PreRun: func(_ *cobra.Command, args []string) { s.configDir = args[0] if s.debug { logger.SetLevel(logrus.DebugLevel) } - return nil }, - RunE: func(cmd *cobra.Command, _ []string) error { - return s.run(cmd.Context()) + Run: func(cmd *cobra.Command, _ []string) { + if !cmd.Flags().Changed("cache-enforce-integrity") { + s.cacheEnforceIntegrity = s.cacheDir != "" && !s.cacheOnly + } + if err := s.run(cmd.Context()); err != nil { + logger.Fatal(err) + } }, } @@ -77,6 +83,7 @@ will not be reflected in the served content. cmd.Flags().StringVar(&s.pprofAddr, "pprof-addr", "", "address of startup profiling endpoint (addr:port format)") cmd.Flags().StringVar(&s.cacheDir, "cache-dir", "", "if set, sync and persist server cache directory") cmd.Flags().BoolVar(&s.cacheOnly, "cache-only", false, "sync the serve cache and exit without serving") + cmd.Flags().BoolVar(&s.cacheEnforceIntegrity, "cache-enforce-integrity", false, "exit with error if cache is not present or has been invalidated. (default: true when --cache-dir is set and --cache-only is false, false otherwise), ") return cmd } @@ -102,11 +109,38 @@ func (s *serve) run(ctx context.Context) error { s.logger = s.logger.WithFields(logrus.Fields{"configs": s.configDir, "port": s.port}) - store, err := registry.NewQuerierFromFS(os.DirFS(s.configDir), s.cacheDir) - defer store.Close() + if s.cacheDir == "" && s.cacheEnforceIntegrity { + return fmt.Errorf("--cache-dir must be specified with --cache-enforce-integrity") + } + + if s.cacheDir == "" { + s.cacheDir, err = os.MkdirTemp("", "opm-serve-cache-") + if err != nil { + return err + } + defer os.RemoveAll(s.cacheDir) + } + + store, err := cache.New(s.cacheDir) if err != nil { return err } + if storeCloser, ok := store.(io.Closer); ok { + defer storeCloser.Close() + } + if s.cacheEnforceIntegrity { + if err := store.CheckIntegrity(os.DirFS(s.configDir)); err != nil { + return err + } + if err := store.Load(); err != nil { + return err + } + } else { + if err := cache.LoadOrRebuild(store, os.DirFS(s.configDir)); err != nil { + return err + } + } + if s.cacheOnly { return nil } diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/cache/cache.go b/vendor/github.com/operator-framework/operator-registry/pkg/cache/cache.go new file mode 100644 index 0000000000..7df08d6f50 --- /dev/null +++ b/vendor/github.com/operator-framework/operator-registry/pkg/cache/cache.go @@ -0,0 +1,104 @@ +package cache + +import ( + "context" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/operator-framework/operator-registry/pkg/api" + "github.com/operator-framework/operator-registry/pkg/registry" +) + +type Cache interface { + registry.GRPCQuery + + CheckIntegrity(fbc fs.FS) error + Build(fbc fs.FS) error + Load() error +} + +func LoadOrRebuild(c Cache, fbc fs.FS) error { + if err := c.CheckIntegrity(fbc); err != nil { + if err := c.Build(fbc); err != nil { + return err + } + } + return c.Load() +} + +// New creates a new Cache. It chooses a cache implementation based +// on the files it finds in the cache directory, with a preference for the +// latest iteration of the cache implementation. It returns an error if +// cacheDir exists and contains unexpected files. +func New(cacheDir string) (Cache, error) { + entries, err := os.ReadDir(cacheDir) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("detect cache format: read cache directory: %v", err) + } + jsonCache := sets.NewString(jsonDir, jsonDigestFile) + + found := sets.NewString() + for _, e := range entries { + found.Insert(e.Name()) + } + + // Preference (and currently only supported) is the JSON-based cache implementation. + if found.IsSuperset(jsonCache) || len(entries) == 0 { + return NewJSON(cacheDir), nil + } + + // Anything else is unexpected. + return nil, fmt.Errorf("cache directory has unexpected contents") +} + +func ensureEmptyDir(dir string, mode os.FileMode) error { + if err := os.MkdirAll(dir, mode); err != nil { + return err + } + entries, err := os.ReadDir(dir) + if err != nil { + return err + } + for _, entry := range entries { + if err := os.RemoveAll(filepath.Join(dir, entry.Name())); err != nil { + return err + } + } + return nil +} + +func doesBundleProvide(ctx context.Context, c Cache, pkgName, chName, bundleName, group, version, kind string) (bool, error) { + apiBundle, err := c.GetBundle(ctx, pkgName, chName, bundleName) + if err != nil { + return false, fmt.Errorf("get bundle %q: %v", bundleName, err) + } + for _, gvk := range apiBundle.ProvidedApis { + if gvk.Group == group && gvk.Version == version && gvk.Kind == kind { + return true, nil + } + } + return false, nil +} + +type sliceBundleSender []*api.Bundle + +func (s *sliceBundleSender) Send(b *api.Bundle) error { + *s = append(*s, b) + return nil +} + +func listBundles(ctx context.Context, c Cache) ([]*api.Bundle, error) { + var bundleSender sliceBundleSender + + err := c.SendBundles(ctx, &bundleSender) + if err != nil { + return nil, err + } + + return bundleSender, nil +} diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/cache/json.go b/vendor/github.com/operator-framework/operator-registry/pkg/cache/json.go new file mode 100644 index 0000000000..47a54952ba --- /dev/null +++ b/vendor/github.com/operator-framework/operator-registry/pkg/cache/json.go @@ -0,0 +1,257 @@ +package cache + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "hash/fnv" + "io/fs" + "os" + "path/filepath" + "strings" + + "github.com/operator-framework/operator-registry/alpha/declcfg" + "github.com/operator-framework/operator-registry/pkg/api" + "github.com/operator-framework/operator-registry/pkg/registry" +) + +var _ Cache = &JSON{} + +type JSON struct { + baseDir string + + packageIndex + apiBundles map[apiBundleKey]string +} + +const ( + jsonCacheModeDir = 0750 + jsonCacheModeFile = 0640 +) + +type apiBundleKey struct { + pkgName string + chName string + name string +} + +func (q *JSON) loadAPIBundle(k apiBundleKey) (*api.Bundle, error) { + filename, ok := q.apiBundles[k] + if !ok { + return nil, fmt.Errorf("package %q, channel %q, bundle %q not found", k.pkgName, k.chName, k.name) + } + d, err := os.ReadFile(filename) + if err != nil { + return nil, err + } + var b api.Bundle + if err := json.Unmarshal(d, &b); err != nil { + return nil, err + } + return &b, nil +} + +func (q *JSON) ListBundles(ctx context.Context) ([]*api.Bundle, error) { + return listBundles(ctx, q) +} + +func (q *JSON) SendBundles(_ context.Context, s registry.BundleSender) error { + for _, pkg := range q.packageIndex { + for _, ch := range pkg.Channels { + for _, b := range ch.Bundles { + apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, b.Name}) + if err != nil { + return fmt.Errorf("convert bundle %q: %v", b.Name, err) + } + if apiBundle.BundlePath != "" { + // The SQLite-based server + // configures its querier to + // omit these fields when + // bundle path is set. + apiBundle.CsvJson = "" + apiBundle.Object = nil + } + if err := s.Send(apiBundle); err != nil { + return err + } + } + } + } + return nil +} + +func (q *JSON) GetBundle(_ context.Context, pkgName, channelName, csvName string) (*api.Bundle, error) { + pkg, ok := q.packageIndex[pkgName] + if !ok { + return nil, fmt.Errorf("package %q not found", pkgName) + } + ch, ok := pkg.Channels[channelName] + if !ok { + return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) + } + b, ok := ch.Bundles[csvName] + if !ok { + return nil, fmt.Errorf("package %q, channel %q, bundle %q not found", pkgName, channelName, csvName) + } + apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, b.Name}) + if err != nil { + return nil, fmt.Errorf("convert bundle %q: %v", b.Name, err) + } + + // unset Replaces and Skips (sqlite query does not populate these fields) + apiBundle.Replaces = "" + apiBundle.Skips = nil + return apiBundle, nil +} + +func (q *JSON) GetBundleForChannel(ctx context.Context, pkgName string, channelName string) (*api.Bundle, error) { + return q.packageIndex.GetBundleForChannel(ctx, q, pkgName, channelName) +} + +func (q *JSON) GetBundleThatReplaces(ctx context.Context, name, pkgName, channelName string) (*api.Bundle, error) { + return q.packageIndex.GetBundleThatReplaces(ctx, q, name, pkgName, channelName) +} + +func (q *JSON) GetChannelEntriesThatProvide(ctx context.Context, group, version, kind string) ([]*registry.ChannelEntry, error) { + return q.packageIndex.GetChannelEntriesThatProvide(ctx, q, group, version, kind) +} + +func (q *JSON) GetLatestChannelEntriesThatProvide(ctx context.Context, group, version, kind string) ([]*registry.ChannelEntry, error) { + return q.packageIndex.GetLatestChannelEntriesThatProvide(ctx, q, group, version, kind) +} + +func (q *JSON) GetBundleThatProvides(ctx context.Context, group, version, kind string) (*api.Bundle, error) { + return q.packageIndex.GetBundleThatProvides(ctx, q, group, version, kind) +} + +func NewJSON(baseDir string) *JSON { + return &JSON{baseDir: baseDir} +} + +const ( + jsonDigestFile = "digest" + jsonDir = "cache" + packagesFile = jsonDir + string(filepath.Separator) + "packages.json" +) + +func (q *JSON) CheckIntegrity(fbcFsys fs.FS) error { + existingDigest, err := q.existingDigest() + if err != nil { + return fmt.Errorf("read existing cache digest: %v", err) + } + computedDigest, err := q.computeDigest(fbcFsys) + if err != nil { + return fmt.Errorf("compute digest: %v", err) + } + if existingDigest != computedDigest { + return fmt.Errorf("cache requires rebuild: cache reports digest as %q, but computed digest is %q", existingDigest, computedDigest) + } + return nil +} + +func (q *JSON) existingDigest() (string, error) { + existingDigestBytes, err := os.ReadFile(filepath.Join(q.baseDir, jsonDigestFile)) + if err != nil { + return "", err + } + return strings.TrimSpace(string(existingDigestBytes)), nil +} + +func (q *JSON) computeDigest(fbcFsys fs.FS) (string, error) { + computedHasher := fnv.New64a() + if err := fsToTar(computedHasher, fbcFsys); err != nil { + return "", err + } + + if cacheFS, err := fs.Sub(os.DirFS(q.baseDir), jsonDir); err == nil { + if err := fsToTar(computedHasher, cacheFS); err != nil && !errors.Is(err, os.ErrNotExist) { + return "", fmt.Errorf("compute hash: %v", err) + } + } + return fmt.Sprintf("%x", computedHasher.Sum(nil)), nil +} + +func (q *JSON) Build(fbcFsys fs.FS) error { + // ensure that generated cache is available to all future users + oldUmask := umask(000) + defer umask(oldUmask) + + if err := ensureEmptyDir(q.baseDir, jsonCacheModeDir); err != nil { + return fmt.Errorf("ensure clean base directory: %v", err) + } + if err := ensureEmptyDir(filepath.Join(q.baseDir, jsonDir), jsonCacheModeDir); err != nil { + return fmt.Errorf("ensure clean base directory: %v", err) + } + + fbc, err := declcfg.LoadFS(fbcFsys) + if err != nil { + return err + } + fbcModel, err := declcfg.ConvertToModel(*fbc) + if err != nil { + return err + } + + pkgs, err := packagesFromModel(fbcModel) + if err != nil { + return err + } + + packageJson, err := json.Marshal(pkgs) + if err != nil { + return err + } + if err := os.WriteFile(filepath.Join(q.baseDir, packagesFile), packageJson, jsonCacheModeFile); err != nil { + return err + } + + q.apiBundles = map[apiBundleKey]string{} + for _, p := range fbcModel { + for _, ch := range p.Channels { + for _, b := range ch.Bundles { + apiBundle, err := api.ConvertModelBundleToAPIBundle(*b) + if err != nil { + return err + } + jsonBundle, err := json.Marshal(apiBundle) + if err != nil { + return err + } + filename := filepath.Join(q.baseDir, jsonDir, fmt.Sprintf("%s_%s_%s.json", p.Name, ch.Name, b.Name)) + if err := os.WriteFile(filename, jsonBundle, jsonCacheModeFile); err != nil { + return err + } + q.apiBundles[apiBundleKey{p.Name, ch.Name, b.Name}] = filename + } + } + } + digest, err := q.computeDigest(fbcFsys) + if err != nil { + return err + } + if err := os.WriteFile(filepath.Join(q.baseDir, jsonDigestFile), []byte(digest), jsonCacheModeFile); err != nil { + return err + } + return nil +} + +func (q *JSON) Load() error { + packagesData, err := os.ReadFile(filepath.Join(q.baseDir, packagesFile)) + if err != nil { + return err + } + if err := json.Unmarshal(packagesData, &q.packageIndex); err != nil { + return err + } + q.apiBundles = map[apiBundleKey]string{} + for _, p := range q.packageIndex { + for _, ch := range p.Channels { + for _, b := range ch.Bundles { + filename := filepath.Join(q.baseDir, jsonDir, fmt.Sprintf("%s_%s_%s.json", p.Name, ch.Name, b.Name)) + q.apiBundles[apiBundleKey{pkgName: p.Name, chName: ch.Name, name: b.Name}] = filename + } + } + } + return nil +} diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/cache/pkgs.go b/vendor/github.com/operator-framework/operator-registry/pkg/cache/pkgs.go new file mode 100644 index 0000000000..d387ddbd09 --- /dev/null +++ b/vendor/github.com/operator-framework/operator-registry/pkg/cache/pkgs.go @@ -0,0 +1,297 @@ +package cache + +import ( + "context" + "fmt" + "sort" + + "github.com/operator-framework/operator-registry/alpha/declcfg" + "github.com/operator-framework/operator-registry/alpha/model" + "github.com/operator-framework/operator-registry/pkg/api" + "github.com/operator-framework/operator-registry/pkg/registry" +) + +type packageIndex map[string]cPkg + +func (pkgs packageIndex) ListPackages(_ context.Context) ([]string, error) { + var packages []string + for pkgName := range pkgs { + packages = append(packages, pkgName) + } + return packages, nil +} + +func (pkgs packageIndex) GetPackage(_ context.Context, name string) (*registry.PackageManifest, error) { + pkg, ok := pkgs[name] + if !ok { + return nil, fmt.Errorf("package %q not found", name) + } + + var channels []registry.PackageChannel + for _, ch := range pkg.Channels { + channels = append(channels, registry.PackageChannel{ + Name: ch.Name, + CurrentCSVName: ch.Head, + }) + } + return ®istry.PackageManifest{ + PackageName: pkg.Name, + Channels: channels, + DefaultChannelName: pkg.DefaultChannel, + }, nil +} + +func (pkgs packageIndex) GetChannelEntriesThatReplace(_ context.Context, name string) ([]*registry.ChannelEntry, error) { + var entries []*registry.ChannelEntry + + for _, pkg := range pkgs { + for _, ch := range pkg.Channels { + for _, b := range ch.Bundles { + entries = append(entries, channelEntriesThatReplace(b, name)...) + } + } + } + if len(entries) == 0 { + return nil, fmt.Errorf("no channel entries found that replace %s", name) + } + return entries, nil +} + +func (pkgs packageIndex) GetBundleForChannel(ctx context.Context, c Cache, pkgName string, channelName string) (*api.Bundle, error) { + pkg, ok := pkgs[pkgName] + if !ok { + return nil, fmt.Errorf("package %q not found", pkgName) + } + ch, ok := pkg.Channels[channelName] + if !ok { + return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) + } + return c.GetBundle(ctx, pkg.Name, ch.Name, ch.Head) +} + +func (pkgs packageIndex) GetBundleThatReplaces(ctx context.Context, c Cache, name, pkgName, channelName string) (*api.Bundle, error) { + pkg, ok := pkgs[pkgName] + if !ok { + return nil, fmt.Errorf("package %s not found", pkgName) + } + ch, ok := pkg.Channels[channelName] + if !ok { + return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) + } + + // NOTE: iterating over a map is non-deterministic in Go, so if multiple bundles replace this one, + // the bundle returned by this function is also non-deterministic. The sqlite implementation + // is ALSO non-deterministic because it doesn't use ORDER BY, so its probably okay for this + // implementation to be non-deterministic as well. + for _, b := range ch.Bundles { + if bundleReplaces(b, name) { + return c.GetBundle(ctx, pkg.Name, ch.Name, b.Name) + } + } + return nil, fmt.Errorf("no entry found for package %q, channel %q", pkgName, channelName) +} + +func (pkgs packageIndex) GetChannelEntriesThatProvide(ctx context.Context, c Cache, group, version, kind string) ([]*registry.ChannelEntry, error) { + var entries []*registry.ChannelEntry + + for _, pkg := range pkgs { + for _, ch := range pkg.Channels { + for _, b := range ch.Bundles { + provides, err := doesBundleProvide(ctx, c, b.Package, b.Channel, b.Name, group, version, kind) + if err != nil { + return nil, err + } + if provides { + // TODO(joelanford): It seems like the SQLite query returns + // invalid entries (i.e. where bundle `Replaces` isn't actually + // in channel `ChannelName`). Is that a bug? For now, this mimics + // the sqlite server and returns seemingly invalid channel entries. + // Don't worry about this. Not used anymore. + + entries = append(entries, pkgs.channelEntriesForBundle(b, true)...) + } + } + } + } + if len(entries) == 0 { + return nil, fmt.Errorf("no channel entries found that provide group:%q version:%q kind:%q", group, version, kind) + } + return entries, nil +} + +// TODO(joelanford): Need to review the expected functionality of this function. I ran +// +// some experiments with the sqlite version of this function and it seems to only return +// channel heads that provide the GVK (rather than searching down the graph if parent bundles +// don't provide the API). Based on that, this function currently looks at channel heads only. +// --- +// Separate, but possibly related, I noticed there are several channels in the channel entry +// table who's minimum depth is 1. What causes 1 to be minimum depth in some cases and 0 in others? +func (pkgs packageIndex) GetLatestChannelEntriesThatProvide(ctx context.Context, c Cache, group, version, kind string) ([]*registry.ChannelEntry, error) { + var entries []*registry.ChannelEntry + + for _, pkg := range pkgs { + for _, ch := range pkg.Channels { + b := ch.Bundles[ch.Head] + provides, err := doesBundleProvide(ctx, c, b.Package, b.Channel, b.Name, group, version, kind) + if err != nil { + return nil, err + } + if provides { + entries = append(entries, pkgs.channelEntriesForBundle(b, false)...) + } + } + } + if len(entries) == 0 { + return nil, fmt.Errorf("no channel entries found that provide group:%q version:%q kind:%q", group, version, kind) + } + return entries, nil +} + +func (pkgs packageIndex) GetBundleThatProvides(ctx context.Context, c Cache, group, version, kind string) (*api.Bundle, error) { + latestEntries, err := c.GetLatestChannelEntriesThatProvide(ctx, group, version, kind) + if err != nil { + return nil, err + } + + // It's possible for multiple packages to provide an API, but this function is forced to choose one. + // To do that deterministically, we'll pick the the bundle based on a lexicographical sort of its + // package name. + sort.Slice(latestEntries, func(i, j int) bool { + return latestEntries[i].PackageName < latestEntries[j].PackageName + }) + + for _, entry := range latestEntries { + pkg, ok := pkgs[entry.PackageName] + if !ok { + // This should never happen because the latest entries were + // collected based on iterating over the packages in q.packageIndex. + continue + } + if entry.ChannelName == pkg.DefaultChannel { + return c.GetBundle(ctx, entry.PackageName, entry.ChannelName, entry.BundleName) + } + } + return nil, fmt.Errorf("no entry found that provides group:%q version:%q kind:%q", group, version, kind) +} + +type cPkg struct { + Name string `json:"name"` + Description string `json:"description"` + Icon *declcfg.Icon `json:"icon"` + DefaultChannel string `json:"defaultChannel"` + Channels map[string]cChannel +} + +type cChannel struct { + Name string + Head string + Bundles map[string]cBundle +} + +type cBundle struct { + Package string `json:"package"` + Channel string `json:"channel"` + Name string `json:"name"` + Replaces string `json:"replaces"` + Skips []string `json:"skips"` +} + +func packagesFromModel(m model.Model) (map[string]cPkg, error) { + pkgs := map[string]cPkg{} + for _, p := range m { + newP := cPkg{ + Name: p.Name, + Description: p.Description, + DefaultChannel: p.DefaultChannel.Name, + Channels: map[string]cChannel{}, + } + if p.Icon != nil { + newP.Icon = &declcfg.Icon{ + Data: p.Icon.Data, + MediaType: p.Icon.MediaType, + } + } + for _, ch := range p.Channels { + head, err := ch.Head() + if err != nil { + return nil, err + } + newCh := cChannel{ + Name: ch.Name, + Head: head.Name, + Bundles: map[string]cBundle{}, + } + for _, b := range ch.Bundles { + newB := cBundle{ + Package: b.Package.Name, + Channel: b.Channel.Name, + Name: b.Name, + Replaces: b.Replaces, + Skips: b.Skips, + } + newCh.Bundles[b.Name] = newB + } + newP.Channels[ch.Name] = newCh + } + pkgs[p.Name] = newP + } + return pkgs, nil +} + +func bundleReplaces(b cBundle, name string) bool { + if b.Replaces == name { + return true + } + for _, s := range b.Skips { + if s == name { + return true + } + } + return false +} + +func channelEntriesThatReplace(b cBundle, name string) []*registry.ChannelEntry { + var entries []*registry.ChannelEntry + if b.Replaces == name { + entries = append(entries, ®istry.ChannelEntry{ + PackageName: b.Package, + ChannelName: b.Channel, + BundleName: b.Name, + Replaces: b.Replaces, + }) + } + for _, s := range b.Skips { + if s == name && s != b.Replaces { + entries = append(entries, ®istry.ChannelEntry{ + PackageName: b.Package, + ChannelName: b.Channel, + BundleName: b.Name, + Replaces: b.Replaces, + }) + } + } + return entries +} + +func (pkgs packageIndex) channelEntriesForBundle(b cBundle, ignoreChannel bool) []*registry.ChannelEntry { + entries := []*registry.ChannelEntry{{ + PackageName: b.Package, + ChannelName: b.Channel, + BundleName: b.Name, + Replaces: b.Replaces, + }} + for _, s := range b.Skips { + // Ignore skips that duplicate b.Replaces. Also, only add it if its + // in the same channel as b (or we're ignoring channel presence). + if _, inChannel := pkgs[b.Package].Channels[b.Channel].Bundles[s]; s != b.Replaces && (ignoreChannel || inChannel) { + entries = append(entries, ®istry.ChannelEntry{ + PackageName: b.Package, + ChannelName: b.Channel, + BundleName: b.Name, + Replaces: s, + }) + } + } + return entries +} diff --git a/staging/operator-registry/pkg/registry/syscall_unix.go b/vendor/github.com/operator-framework/operator-registry/pkg/cache/syscall_unix.go similarity index 84% rename from staging/operator-registry/pkg/registry/syscall_unix.go rename to vendor/github.com/operator-framework/operator-registry/pkg/cache/syscall_unix.go index b1edcf59fd..93372adb4e 100644 --- a/staging/operator-registry/pkg/registry/syscall_unix.go +++ b/vendor/github.com/operator-framework/operator-registry/pkg/cache/syscall_unix.go @@ -1,7 +1,7 @@ //go:build !windows // +build !windows -package registry +package cache import "golang.org/x/sys/unix" diff --git a/staging/operator-registry/pkg/registry/syscall_windows.go b/vendor/github.com/operator-framework/operator-registry/pkg/cache/syscall_windows.go similarity index 82% rename from staging/operator-registry/pkg/registry/syscall_windows.go rename to vendor/github.com/operator-framework/operator-registry/pkg/cache/syscall_windows.go index 525c656f1c..7ff5ad8e86 100644 --- a/staging/operator-registry/pkg/registry/syscall_windows.go +++ b/vendor/github.com/operator-framework/operator-registry/pkg/cache/syscall_windows.go @@ -1,6 +1,6 @@ //go:build windows // +build windows -package registry +package cache var umask = func(i int) int { return 0 } diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/registry/tar.go b/vendor/github.com/operator-framework/operator-registry/pkg/cache/tar.go similarity index 98% rename from vendor/github.com/operator-framework/operator-registry/pkg/registry/tar.go rename to vendor/github.com/operator-framework/operator-registry/pkg/cache/tar.go index f62a15da85..b368e011e9 100644 --- a/vendor/github.com/operator-framework/operator-registry/pkg/registry/tar.go +++ b/vendor/github.com/operator-framework/operator-registry/pkg/cache/tar.go @@ -1,4 +1,4 @@ -package registry +package cache import ( "archive/tar" diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/registry/query.go b/vendor/github.com/operator-framework/operator-registry/pkg/registry/query.go deleted file mode 100644 index 4c4212217f..0000000000 --- a/vendor/github.com/operator-framework/operator-registry/pkg/registry/query.go +++ /dev/null @@ -1,661 +0,0 @@ -package registry - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "hash/fnv" - "io/fs" - "os" - "path/filepath" - "sort" - "strings" - - "github.com/operator-framework/operator-registry/alpha/declcfg" - "github.com/operator-framework/operator-registry/alpha/model" - "github.com/operator-framework/operator-registry/pkg/api" -) - -const ( - cachePermissionDir = 0750 - cachePermissionFile = 0640 -) - -type Querier struct { - *cache -} - -func (q Querier) Close() error { - return q.cache.close() -} - -type apiBundleKey struct { - pkgName string - chName string - name string -} - -type SliceBundleSender []*api.Bundle - -func (s *SliceBundleSender) Send(b *api.Bundle) error { - - *s = append(*s, b) - return nil -} - -var _ GRPCQuery = &Querier{} - -func NewQuerierFromFS(fbcFS fs.FS, cacheDir string) (*Querier, error) { - q := &Querier{} - var err error - q.cache, err = newCache(cacheDir, &fbcCacheModel{ - FBC: fbcFS, - Cache: os.DirFS(cacheDir), - }) - if err != nil { - return q, err - } - return q, nil -} - -func NewQuerier(m model.Model) (*Querier, error) { - q := &Querier{} - var err error - q.cache, err = newCache("", &nonDigestableModel{Model: m}) - if err != nil { - return q, err - } - return q, nil -} - -func (q Querier) loadAPIBundle(k apiBundleKey) (*api.Bundle, error) { - filename, ok := q.apiBundles[k] - if !ok { - return nil, fmt.Errorf("package %q, channel %q, bundle %q not found", k.pkgName, k.chName, k.name) - } - d, err := os.ReadFile(filename) - if err != nil { - return nil, err - } - var b api.Bundle - if err := json.Unmarshal(d, &b); err != nil { - return nil, err - } - return &b, nil -} - -func (q Querier) ListPackages(_ context.Context) ([]string, error) { - var packages []string - for pkgName := range q.pkgs { - packages = append(packages, pkgName) - } - return packages, nil -} - -func (q Querier) ListBundles(ctx context.Context) ([]*api.Bundle, error) { - var bundleSender SliceBundleSender - - err := q.SendBundles(ctx, &bundleSender) - if err != nil { - return nil, err - } - - return bundleSender, nil -} - -func (q Querier) SendBundles(_ context.Context, s BundleSender) error { - for _, pkg := range q.pkgs { - for _, ch := range pkg.Channels { - for _, b := range ch.Bundles { - apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, b.Name}) - if err != nil { - return fmt.Errorf("convert bundle %q: %v", b.Name, err) - } - if apiBundle.BundlePath != "" { - // The SQLite-based server - // configures its querier to - // omit these fields when - // bundle path is set. - apiBundle.CsvJson = "" - apiBundle.Object = nil - } - if err := s.Send(apiBundle); err != nil { - return err - } - } - } - } - return nil -} - -func (q Querier) GetPackage(_ context.Context, name string) (*PackageManifest, error) { - pkg, ok := q.pkgs[name] - if !ok { - return nil, fmt.Errorf("package %q not found", name) - } - - var channels []PackageChannel - for _, ch := range pkg.Channels { - channels = append(channels, PackageChannel{ - Name: ch.Name, - CurrentCSVName: ch.Head, - }) - } - return &PackageManifest{ - PackageName: pkg.Name, - Channels: channels, - DefaultChannelName: pkg.DefaultChannel, - }, nil -} - -func (q Querier) GetBundle(_ context.Context, pkgName, channelName, csvName string) (*api.Bundle, error) { - pkg, ok := q.pkgs[pkgName] - if !ok { - return nil, fmt.Errorf("package %q not found", pkgName) - } - ch, ok := pkg.Channels[channelName] - if !ok { - return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) - } - b, ok := ch.Bundles[csvName] - if !ok { - return nil, fmt.Errorf("package %q, channel %q, bundle %q not found", pkgName, channelName, csvName) - } - apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, b.Name}) - if err != nil { - return nil, fmt.Errorf("convert bundle %q: %v", b.Name, err) - } - - // unset Replaces and Skips (sqlite query does not populate these fields) - apiBundle.Replaces = "" - apiBundle.Skips = nil - return apiBundle, nil -} - -func (q Querier) GetBundleForChannel(_ context.Context, pkgName string, channelName string) (*api.Bundle, error) { - pkg, ok := q.pkgs[pkgName] - if !ok { - return nil, fmt.Errorf("package %q not found", pkgName) - } - ch, ok := pkg.Channels[channelName] - if !ok { - return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) - } - apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, ch.Head}) - if err != nil { - return nil, fmt.Errorf("convert bundle %q: %v", ch.Head, err) - } - - // unset Replaces and Skips (sqlite query does not populate these fields) - apiBundle.Replaces = "" - apiBundle.Skips = nil - return apiBundle, nil -} - -func (q Querier) GetChannelEntriesThatReplace(_ context.Context, name string) ([]*ChannelEntry, error) { - var entries []*ChannelEntry - - for _, pkg := range q.pkgs { - for _, ch := range pkg.Channels { - for _, b := range ch.Bundles { - entries = append(entries, channelEntriesThatReplace(b, name)...) - } - } - } - if len(entries) == 0 { - return nil, fmt.Errorf("no channel entries found that replace %s", name) - } - return entries, nil -} - -func (q Querier) GetBundleThatReplaces(_ context.Context, name, pkgName, channelName string) (*api.Bundle, error) { - pkg, ok := q.pkgs[pkgName] - if !ok { - return nil, fmt.Errorf("package %s not found", pkgName) - } - ch, ok := pkg.Channels[channelName] - if !ok { - return nil, fmt.Errorf("package %q, channel %q not found", pkgName, channelName) - } - - // NOTE: iterating over a map is non-deterministic in Go, so if multiple bundles replace this one, - // the bundle returned by this function is also non-deterministic. The sqlite implementation - // is ALSO non-deterministic because it doesn't use ORDER BY, so its probably okay for this - // implementation to be non-deterministic as well. - for _, b := range ch.Bundles { - if bundleReplaces(b, name) { - apiBundle, err := q.loadAPIBundle(apiBundleKey{pkg.Name, ch.Name, b.Name}) - if err != nil { - return nil, fmt.Errorf("convert bundle %q: %v", b.Name, err) - } - - // unset Replaces and Skips (sqlite query does not populate these fields) - apiBundle.Replaces = "" - apiBundle.Skips = nil - return apiBundle, nil - } - } - return nil, fmt.Errorf("no entry found for package %q, channel %q", pkgName, channelName) -} - -func (q Querier) GetChannelEntriesThatProvide(_ context.Context, group, version, kind string) ([]*ChannelEntry, error) { - var entries []*ChannelEntry - - for _, pkg := range q.pkgs { - for _, ch := range pkg.Channels { - for _, b := range ch.Bundles { - provides, err := q.doesModelBundleProvide(b, group, version, kind) - if err != nil { - return nil, err - } - if provides { - // TODO(joelanford): It seems like the SQLite query returns - // invalid entries (i.e. where bundle `Replaces` isn't actually - // in channel `ChannelName`). Is that a bug? For now, this mimics - // the sqlite server and returns seemingly invalid channel entries. - // Don't worry about this. Not used anymore. - - entries = append(entries, q.channelEntriesForBundle(b, true)...) - } - } - } - } - if len(entries) == 0 { - return nil, fmt.Errorf("no channel entries found that provide group:%q version:%q kind:%q", group, version, kind) - } - return entries, nil -} - -// TODO(joelanford): Need to review the expected functionality of this function. I ran -// some experiments with the sqlite version of this function and it seems to only return -// channel heads that provide the GVK (rather than searching down the graph if parent bundles -// don't provide the API). Based on that, this function currently looks at channel heads only. -// --- -// Separate, but possibly related, I noticed there are several channels in the channel entry -// table who's minimum depth is 1. What causes 1 to be minimum depth in some cases and 0 in others? -func (q Querier) GetLatestChannelEntriesThatProvide(_ context.Context, group, version, kind string) ([]*ChannelEntry, error) { - var entries []*ChannelEntry - - for _, pkg := range q.pkgs { - for _, ch := range pkg.Channels { - b := ch.Bundles[ch.Head] - provides, err := q.doesModelBundleProvide(b, group, version, kind) - if err != nil { - return nil, err - } - if provides { - entries = append(entries, q.channelEntriesForBundle(b, false)...) - } - } - } - if len(entries) == 0 { - return nil, fmt.Errorf("no channel entries found that provide group:%q version:%q kind:%q", group, version, kind) - } - return entries, nil -} - -func (q Querier) GetBundleThatProvides(ctx context.Context, group, version, kind string) (*api.Bundle, error) { - latestEntries, err := q.GetLatestChannelEntriesThatProvide(ctx, group, version, kind) - if err != nil { - return nil, err - } - - // It's possible for multiple packages to provide an API, but this function is forced to choose one. - // To do that deterministically, we'll pick the the bundle based on a lexicographical sort of its - // package name. - sort.Slice(latestEntries, func(i, j int) bool { - return latestEntries[i].PackageName < latestEntries[j].PackageName - }) - - for _, entry := range latestEntries { - pkg, ok := q.pkgs[entry.PackageName] - if !ok { - // This should never happen because the latest entries were - // collected based on iterating over the packages in q.pkgs. - continue - } - if entry.ChannelName == pkg.DefaultChannel { - return q.GetBundle(ctx, entry.PackageName, entry.ChannelName, entry.BundleName) - } - } - return nil, fmt.Errorf("no entry found that provides group:%q version:%q kind:%q", group, version, kind) -} - -func (q Querier) doesModelBundleProvide(b cBundle, group, version, kind string) (bool, error) { - apiBundle, err := q.loadAPIBundle(apiBundleKey{b.Package, b.Channel, b.Name}) - if err != nil { - return false, fmt.Errorf("convert bundle %q: %v", b.Name, err) - } - for _, gvk := range apiBundle.ProvidedApis { - if gvk.Group == group && gvk.Version == version && gvk.Kind == kind { - return true, nil - } - } - return false, nil -} - -func bundleReplaces(b cBundle, name string) bool { - if b.Replaces == name { - return true - } - for _, s := range b.Skips { - if s == name { - return true - } - } - return false -} - -func channelEntriesThatReplace(b cBundle, name string) []*ChannelEntry { - var entries []*ChannelEntry - if b.Replaces == name { - entries = append(entries, &ChannelEntry{ - PackageName: b.Package, - ChannelName: b.Channel, - BundleName: b.Name, - Replaces: b.Replaces, - }) - } - for _, s := range b.Skips { - if s == name && s != b.Replaces { - entries = append(entries, &ChannelEntry{ - PackageName: b.Package, - ChannelName: b.Channel, - BundleName: b.Name, - Replaces: b.Replaces, - }) - } - } - return entries -} - -func (q Querier) channelEntriesForBundle(b cBundle, ignoreChannel bool) []*ChannelEntry { - entries := []*ChannelEntry{{ - PackageName: b.Package, - ChannelName: b.Channel, - BundleName: b.Name, - Replaces: b.Replaces, - }} - for _, s := range b.Skips { - // Ignore skips that duplicate b.Replaces. Also, only add it if its - // in the same channel as b (or we're ignoring channel presence). - if _, inChannel := q.pkgs[b.Package].Channels[b.Channel].Bundles[s]; s != b.Replaces && (ignoreChannel || inChannel) { - entries = append(entries, &ChannelEntry{ - PackageName: b.Package, - ChannelName: b.Channel, - BundleName: b.Name, - Replaces: s, - }) - } - } - return entries -} - -type cache struct { - digest string - baseDir string - persist bool - pkgs map[string]cPkg - apiBundles map[apiBundleKey]string -} - -func newCache(baseDir string, model digestableModel) (*cache, error) { - var ( - qc *cache - err error - ) - if baseDir == "" { - qc, err = newEphemeralCache() - } else { - qc, err = newPersistentCache(baseDir) - } - if err != nil { - return nil, err - } - return qc, qc.load(model) -} - -func (qc cache) close() error { - if qc.persist { - return nil - } - return os.RemoveAll(qc.baseDir) -} - -func newEphemeralCache() (*cache, error) { - baseDir, err := os.MkdirTemp("", "opm-serve-cache-") - if err != nil { - return nil, err - } - if err := os.MkdirAll(filepath.Join(baseDir, "cache"), cachePermissionDir); err != nil { - return nil, err - } - return &cache{ - digest: "", - baseDir: baseDir, - persist: false, - }, nil -} - -func newPersistentCache(baseDir string) (*cache, error) { - if err := os.MkdirAll(baseDir, cachePermissionDir); err != nil { - return nil, err - } - qc := &cache{baseDir: baseDir, persist: true} - if digest, err := os.ReadFile(filepath.Join(baseDir, "digest")); err == nil { - qc.digest = strings.TrimSpace(string(digest)) - } - return qc, nil -} - -func (qc *cache) load(model digestableModel) error { - computedDigest, err := model.GetDigest() - if err != nil && !errors.Is(err, errNonDigestable) { - return fmt.Errorf("compute digest: %v", err) - } - if err == nil && computedDigest == qc.digest { - err = qc.loadFromCache() - if err == nil { - return nil - } - // if there _was_ an error loading from the cache, - // we'll drop down and repopulate from scratch. - } - return qc.repopulateCache(model) -} - -func (qc *cache) loadFromCache() error { - packagesData, err := os.ReadFile(filepath.Join(qc.baseDir, "cache", "packages.json")) - if err != nil { - return err - } - if err := json.Unmarshal(packagesData, &qc.pkgs); err != nil { - return err - } - qc.apiBundles = map[apiBundleKey]string{} - for _, p := range qc.pkgs { - for _, ch := range p.Channels { - for _, b := range ch.Bundles { - filename := filepath.Join(qc.baseDir, "cache", fmt.Sprintf("%s_%s_%s.json", p.Name, ch.Name, b.Name)) - qc.apiBundles[apiBundleKey{pkgName: p.Name, chName: ch.Name, name: b.Name}] = filename - } - } - } - return nil -} - -func (qc *cache) repopulateCache(model digestableModel) error { - // ensure that generated cache is available to all future users - oldUmask := umask(000) - defer umask(oldUmask) - - m, err := model.GetModel() - if err != nil { - return err - } - cacheDirEntries, err := os.ReadDir(qc.baseDir) - if err != nil { - return err - } - for _, e := range cacheDirEntries { - if err := os.RemoveAll(filepath.Join(qc.baseDir, e.Name())); err != nil { - return err - } - } - if err := os.MkdirAll(filepath.Join(qc.baseDir, "cache"), cachePermissionDir); err != nil { - return err - } - - qc.pkgs, err = packagesFromModel(m) - if err != nil { - return err - } - - packageJson, err := json.Marshal(qc.pkgs) - if err != nil { - return err - } - if err := os.WriteFile(filepath.Join(qc.baseDir, "cache", "packages.json"), packageJson, cachePermissionFile); err != nil { - return err - } - - qc.apiBundles = map[apiBundleKey]string{} - for _, p := range m { - for _, ch := range p.Channels { - for _, b := range ch.Bundles { - apiBundle, err := api.ConvertModelBundleToAPIBundle(*b) - if err != nil { - return err - } - jsonBundle, err := json.Marshal(apiBundle) - if err != nil { - return err - } - filename := filepath.Join(qc.baseDir, "cache", fmt.Sprintf("%s_%s_%s.json", p.Name, ch.Name, b.Name)) - if err := os.WriteFile(filename, jsonBundle, cachePermissionFile); err != nil { - return err - } - qc.apiBundles[apiBundleKey{p.Name, ch.Name, b.Name}] = filename - } - } - } - computedHash, err := model.GetDigest() - if err == nil { - if err := os.WriteFile(filepath.Join(qc.baseDir, "digest"), []byte(computedHash), cachePermissionFile); err != nil { - return err - } - } else if !errors.Is(err, errNonDigestable) { - return fmt.Errorf("compute digest: %v", err) - } - return nil -} - -func packagesFromModel(m model.Model) (map[string]cPkg, error) { - pkgs := map[string]cPkg{} - for _, p := range m { - newP := cPkg{ - Name: p.Name, - Description: p.Description, - DefaultChannel: p.DefaultChannel.Name, - Channels: map[string]cChannel{}, - } - if p.Icon != nil { - newP.Icon = &declcfg.Icon{ - Data: p.Icon.Data, - MediaType: p.Icon.MediaType, - } - } - for _, ch := range p.Channels { - head, err := ch.Head() - if err != nil { - return nil, err - } - newCh := cChannel{ - Name: ch.Name, - Head: head.Name, - Bundles: map[string]cBundle{}, - } - for _, b := range ch.Bundles { - newB := cBundle{ - Package: b.Package.Name, - Channel: b.Channel.Name, - Name: b.Name, - Replaces: b.Replaces, - Skips: b.Skips, - } - newCh.Bundles[b.Name] = newB - } - newP.Channels[ch.Name] = newCh - } - pkgs[p.Name] = newP - } - return pkgs, nil -} - -type cPkg struct { - Name string `json:"name"` - Description string `json:"description"` - Icon *declcfg.Icon `json:"icon"` - DefaultChannel string `json:"defaultChannel"` - Channels map[string]cChannel -} - -type cChannel struct { - Name string - Head string - Bundles map[string]cBundle -} - -type cBundle struct { - Package string `json:"package"` - Channel string `json:"channel"` - Name string `json:"name"` - Replaces string `json:"replaces"` - Skips []string `json:"skips"` -} - -type digestableModel interface { - GetModel() (model.Model, error) - GetDigest() (string, error) -} - -type fbcCacheModel struct { - FBC fs.FS - Cache fs.FS -} - -func (m *fbcCacheModel) GetModel() (model.Model, error) { - fbc, err := declcfg.LoadFS(m.FBC) - if err != nil { - return nil, err - } - return declcfg.ConvertToModel(*fbc) -} - -func (m *fbcCacheModel) GetDigest() (string, error) { - computedHasher := fnv.New64a() - if err := fsToTar(computedHasher, m.FBC); err != nil { - return "", err - } - if cacheFS, err := fs.Sub(m.Cache, "cache"); err == nil { - if err := fsToTar(computedHasher, cacheFS); err != nil && !errors.Is(err, os.ErrNotExist) { - return "", fmt.Errorf("compute hash: %v", err) - } - } - return fmt.Sprintf("%x", computedHasher.Sum(nil)), nil -} - -var errNonDigestable = errors.New("cannot generate digest") - -type nonDigestableModel struct { - model.Model -} - -func (m *nonDigestableModel) GetModel() (model.Model, error) { - return m.Model, nil -} - -func (m *nonDigestableModel) GetDigest() (string, error) { - return "", errNonDigestable -} diff --git a/vendor/github.com/operator-framework/operator-registry/pkg/sqlite/query.go b/vendor/github.com/operator-framework/operator-registry/pkg/sqlite/query.go index 6d46d544c2..24880f1fca 100644 --- a/vendor/github.com/operator-framework/operator-registry/pkg/sqlite/query.go +++ b/vendor/github.com/operator-framework/operator-registry/pkg/sqlite/query.go @@ -1065,8 +1065,15 @@ func (s *SQLQuerier) SendBundles(ctx context.Context, stream registry.BundleSend return nil } +type sliceBundleSender []*api.Bundle + +func (s *sliceBundleSender) Send(b *api.Bundle) error { + *s = append(*s, b) + return nil +} + func (s *SQLQuerier) ListBundles(ctx context.Context) ([]*api.Bundle, error) { - var bundleSender registry.SliceBundleSender + var bundleSender sliceBundleSender err := s.SendBundles(ctx, &bundleSender) if err != nil { return nil, err diff --git a/vendor/modules.txt b/vendor/modules.txt index 6bd4c18838..2363e2774c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -777,6 +777,7 @@ github.com/operator-framework/operator-registry/cmd/opm/version github.com/operator-framework/operator-registry/cmd/registry-server github.com/operator-framework/operator-registry/pkg/api github.com/operator-framework/operator-registry/pkg/api/grpc_health_v1 +github.com/operator-framework/operator-registry/pkg/cache github.com/operator-framework/operator-registry/pkg/client github.com/operator-framework/operator-registry/pkg/configmap github.com/operator-framework/operator-registry/pkg/containertools