Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ static: build

.PHONY: unit
unit:
$(GO) test -coverprofile=coverage.out $(SPECIFIC_UNIT_TEST) $(TAGS) $(TEST_RACE) -count=1 ./pkg/... ./alpha/...
$(GO) test -coverprofile=coverage.out -coverpkg=./... $(SPECIFIC_UNIT_TEST) $(TAGS) $(TEST_RACE) -count=1 ./pkg/... ./alpha/...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change means we now consider cross-package coverage. (i.e. the tests in pkg/server/server_test.go can contribute to coverage in pkg/registry/query.go)

This very likely means there's an artificial increase in the repo-wide coverage, but I did my best to add specific unit tests related to this PR in pkg/registry/query_test.go to make sure the new code paths are relatively well-covered.


.PHONY: sanity-check
sanity-check:
Expand Down
105 changes: 71 additions & 34 deletions cmd/opm/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,23 @@ import (
"errors"
"fmt"
"net"
"os"
"sync"

"net/http"
endpoint "net/http/pprof"
"os"
"os/signal"
"runtime/pprof"
"sync"
"syscall"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

"github.com/operator-framework/operator-registry/alpha/declcfg"
"github.com/operator-framework/operator-registry/pkg/api"
health "github.com/operator-framework/operator-registry/pkg/api/grpc_health_v1"
"github.com/operator-framework/operator-registry/pkg/lib/dns"
"github.com/operator-framework/operator-registry/pkg/lib/graceful"
"github.com/operator-framework/operator-registry/pkg/lib/log"
"github.com/operator-framework/operator-registry/pkg/registry"
"github.com/operator-framework/operator-registry/pkg/server"
Expand Down Expand Up @@ -80,11 +80,14 @@ will not be reflected in the served content.

func (s *serve) run(ctx context.Context) error {
p := newProfilerInterface(s.pprofAddr, s.logger)
p.startEndpoint()
if err := p.startCpuProfileCache(); err != nil {
return fmt.Errorf("could not start CPU profile: %v", err)
}

ctx, cancel := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
defer cancel()
eg, ctx := errgroup.WithContext(ctx)

// Immediately set up termination log
err := log.AddDefaultWriterHooks(s.terminationLog)
if err != nil {
Expand All @@ -98,20 +101,11 @@ func (s *serve) run(ctx context.Context) error {

s.logger = s.logger.WithFields(logrus.Fields{"configs": s.configDir, "port": s.port})

cfg, err := declcfg.LoadFS(os.DirFS(s.configDir))
if err != nil {
return fmt.Errorf("load declarative config directory: %v", err)
}

m, err := declcfg.ConvertToModel(*cfg)
if err != nil {
return fmt.Errorf("could not build index model from declarative config: %v", err)
}
store, err := registry.NewQuerier(m)
defer store.Close()
store, err := registry.NewQuerierFromFS(os.DirFS(s.configDir))
if err != nil {
return err
}
defer store.Close()

lis, err := net.Listen("tcp", ":"+s.port)
if err != nil {
Expand All @@ -122,15 +116,55 @@ func (s *serve) run(ctx context.Context) error {
api.RegisterRegistryServer(grpcServer, server.NewRegistryServer(store))
health.RegisterHealthServer(grpcServer, server.NewHealthServer())
reflection.Register(grpcServer)
s.logger.Info("serving registry")
p.stopCpuProfileCache()

return graceful.Shutdown(s.logger, func() error {
return grpcServer.Serve(lis)
}, func() {
grpcServer.GracefulStop()
p.stopEndpoint(p.logger.Context)

eg.Go(func() error {
// All this channel stuff is necessary so that we can return from
// this function early when the context is cancelled. This is required
// to get `eg.Wait()` to unblock, so that we can proceed to gracefully
// shutting down.
errChan := make(chan error)
go func() {
s.logger.Info("serving registry")
errChan <- grpcServer.Serve(lis)
}()
select {
case err := <-errChan:
return err
case <-ctx.Done():
return ctx.Err()
}
})
eg.Go(func() error {
return p.listenAndServe(ctx)
})
eg.Go(func() (err error) {
defer p.stopCpuProfileCache()
if err := store.Wait(ctx); err != nil {
return err
}
s.logger.Info("registry initialization complete")
return nil
})

// wait until both errgroup goroutines return and then
// return the first error that occurred (or nil)
err = eg.Wait()

// stop the servers prior to handling the error returned
// from Wait().
s.logger.Info("stopping grpc server")
grpcServer.GracefulStop()
if p.isEnabled() {
s.logger.Info("stopping http pprof server")
if err := p.shutdown(context.Background()); err != nil {
return err
}
}

if !errors.Is(err, context.Canceled) {
return err
}
return nil

}

Expand Down Expand Up @@ -162,10 +196,10 @@ func (p *profilerInterface) isEnabled() bool {
return p.addr != ""
}

func (p *profilerInterface) startEndpoint() {
func (p *profilerInterface) listenAndServe(ctx context.Context) error {
// short-circuit if not enabled
if !p.isEnabled() {
return
return nil
}

mux := http.NewServeMux()
Expand All @@ -181,14 +215,19 @@ func (p *profilerInterface) startEndpoint() {
Handler: mux,
}

// goroutine exits with main
errChan := make(chan error)
go func() {

p.logger.Info("starting pprof endpoint")
if err := p.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
p.logger.Fatal(err)
errChan <- err
}
}()
select {
case err := <-errChan:
return err
case <-ctx.Done():
return ctx.Err()
}
}

func (p *profilerInterface) startCpuProfileCache() error {
Expand Down Expand Up @@ -222,10 +261,8 @@ func (p *profilerInterface) httpHandler(w http.ResponseWriter, r *http.Request)
w.Write(p.cache.Bytes())
}

func (p *profilerInterface) stopEndpoint(ctx context.Context) {
if err := p.server.Shutdown(ctx); err != nil {
p.logger.Fatal(err)
}
func (p *profilerInterface) shutdown(ctx context.Context) error {
return p.server.Shutdown(ctx)
}

func (p *profilerInterface) isCacheReady() bool {
Expand Down
2 changes: 2 additions & 0 deletions pkg/lib/registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func newQuerier(t *testing.T, bundles []*model.Bundle) *registry.Querier {
}
reg, err := registry.NewQuerier(pkgs)
require.NoError(t, err)
err = reg.Wait(context.Background())
require.NoError(t, err)
return reg
}

Expand Down
Loading