Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ static: build

.PHONY: unit
unit:
$(GO) test -coverprofile=coverage.out $(SPECIFIC_UNIT_TEST) $(TAGS) $(TEST_RACE) -count=1 ./pkg/... ./alpha/...
$(GO) test -coverprofile=coverage.out -coverpkg=./... $(SPECIFIC_UNIT_TEST) $(TAGS) $(TEST_RACE) -count=1 ./pkg/... ./alpha/...
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change means we now consider cross-package coverage. (i.e. the tests in pkg/server/server_test.go can contribute to coverage in pkg/registry/query.go)

This very likely means there's an artificial increase in the repo-wide coverage, but I did my best to add specific unit tests related to this PR in pkg/registry/query_test.go to make sure the new code paths are relatively well-covered.


.PHONY: sanity-check
sanity-check:
Expand Down
154 changes: 119 additions & 35 deletions cmd/opm/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,24 @@ import (
"errors"
"fmt"
"net"
"os"
"sync"

"net/http"
endpoint "net/http/pprof"
"os"
"os/signal"
"runtime/pprof"
"sync"
"syscall"
"time"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

"github.com/operator-framework/operator-registry/alpha/declcfg"
"github.com/operator-framework/operator-registry/pkg/api"
health "github.com/operator-framework/operator-registry/pkg/api/grpc_health_v1"
"github.com/operator-framework/operator-registry/pkg/lib/dns"
"github.com/operator-framework/operator-registry/pkg/lib/graceful"
"github.com/operator-framework/operator-registry/pkg/lib/log"
"github.com/operator-framework/operator-registry/pkg/registry"
"github.com/operator-framework/operator-registry/pkg/server"
Expand All @@ -33,6 +34,7 @@ type serve struct {

port string
terminationLog string
serverTimeout time.Duration

debug bool
pprofAddr string
Expand Down Expand Up @@ -75,16 +77,20 @@ will not be reflected in the served content.
cmd.Flags().StringVarP(&s.terminationLog, "termination-log", "t", "/dev/termination-log", "path to a container termination log file")
cmd.Flags().StringVarP(&s.port, "port", "p", "50051", "port number to serve on")
cmd.Flags().StringVar(&s.pprofAddr, "pprof-addr", "", "address of startup profiling endpoint (addr:port format)")
cmd.Flags().DurationVar(&s.serverTimeout, "server-timeout", time.Second*30, "server-enforced timeout for grpc requests")
return cmd
}

func (s *serve) run(ctx context.Context) error {
p := newProfilerInterface(s.pprofAddr, s.logger)
p.startEndpoint()
if err := p.startCpuProfileCache(); err != nil {
return fmt.Errorf("could not start CPU profile: %v", err)
}

ctx, cancel := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
defer cancel()
eg, ctx := errgroup.WithContext(ctx)

// Immediately set up termination log
err := log.AddDefaultWriterHooks(s.terminationLog)
if err != nil {
Expand All @@ -98,40 +104,84 @@ func (s *serve) run(ctx context.Context) error {

s.logger = s.logger.WithFields(logrus.Fields{"configs": s.configDir, "port": s.port})

cfg, err := declcfg.LoadFS(os.DirFS(s.configDir))
if err != nil {
return fmt.Errorf("load declarative config directory: %v", err)
}

m, err := declcfg.ConvertToModel(*cfg)
if err != nil {
return fmt.Errorf("could not build index model from declarative config: %v", err)
}
store, err := registry.NewQuerier(m)
defer store.Close()
store, err := registry.NewQuerierFromFS(os.DirFS(s.configDir))
if err != nil {
return err
}
defer store.Close()

lis, err := net.Listen("tcp", ":"+s.port)
if err != nil {
s.logger.Fatalf("failed to listen: %s", err)
}

grpcServer := grpc.NewServer()
var grpcServerOpts []grpc.ServerOption
if s.serverTimeout > 0 {
grpcServerOpts = append(grpcServerOpts, grpc.StreamInterceptor(func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
dss := &deadlinableServerStream{timeout: s.serverTimeout, ServerStream: ss}
defer dss.Cancel()
return handler(srv, dss)
}), grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
ctx, cancel := context.WithTimeout(ctx, s.serverTimeout)
defer cancel()
return handler(ctx, req)
}))
}

grpcServer := grpc.NewServer(grpcServerOpts...)
api.RegisterRegistryServer(grpcServer, server.NewRegistryServer(store))
health.RegisterHealthServer(grpcServer, server.NewHealthServer())
reflection.Register(grpcServer)
s.logger.Info("serving registry")
p.stopCpuProfileCache()

return graceful.Shutdown(s.logger, func() error {
return grpcServer.Serve(lis)
}, func() {
grpcServer.GracefulStop()
p.stopEndpoint(p.logger.Context)

eg.Go(func() error {
// All this channel stuff is necessary so that we can return from
// this function early when the context is cancelled. This is required
// to get `eg.Wait()` to unblock, so that we can proceed to gracefully
// shutting down.
errChan := make(chan error)
go func() {
s.logger.Info("serving registry")
errChan <- grpcServer.Serve(lis)
}()
select {
case err := <-errChan:
return err
case <-ctx.Done():
return ctx.Err()
}
})
eg.Go(func() error {
return p.listenAndServe(ctx)
})
eg.Go(func() (err error) {
defer p.stopCpuProfileCache()
if err := store.Wait(ctx); err != nil {
return err
}
s.logger.Info("registry initialization complete")
return nil
})

// wait until both errgroup goroutines return and then
// return the first error that occurred (or nil)
err = eg.Wait()

// stop the servers prior to handling the error returned
// from Wait().
s.logger.Info("stopping grpc server")
grpcServer.GracefulStop()
if p.isEnabled() {
s.logger.Info("stopping http pprof server")
if err := p.shutdown(context.Background()); err != nil {
return err
}
}

if !errors.Is(err, context.Canceled) {
return err
}
return nil

}

// manages an HTTP pprof endpoint served by `server`,
Expand Down Expand Up @@ -162,10 +212,10 @@ func (p *profilerInterface) isEnabled() bool {
return p.addr != ""
}

func (p *profilerInterface) startEndpoint() {
func (p *profilerInterface) listenAndServe(ctx context.Context) error {
// short-circuit if not enabled
if !p.isEnabled() {
return
return nil
}

mux := http.NewServeMux()
Expand All @@ -181,14 +231,19 @@ func (p *profilerInterface) startEndpoint() {
Handler: mux,
}

// goroutine exits with main
errChan := make(chan error)
go func() {

p.logger.Info("starting pprof endpoint")
if err := p.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
p.logger.Fatal(err)
errChan <- err
}
}()
select {
case err := <-errChan:
return err
case <-ctx.Done():
return ctx.Err()
}
}

func (p *profilerInterface) startCpuProfileCache() error {
Expand Down Expand Up @@ -222,10 +277,8 @@ func (p *profilerInterface) httpHandler(w http.ResponseWriter, r *http.Request)
w.Write(p.cache.Bytes())
}

func (p *profilerInterface) stopEndpoint(ctx context.Context) {
if err := p.server.Shutdown(ctx); err != nil {
p.logger.Fatal(err)
}
func (p *profilerInterface) shutdown(ctx context.Context) error {
return p.server.Shutdown(ctx)
}

func (p *profilerInterface) isCacheReady() bool {
Expand All @@ -241,3 +294,34 @@ func (p *profilerInterface) setCacheReady() {
p.cacheReady = true
p.cacheLock.Unlock()
}

type deadlinableServerStream struct {
grpc.ServerStream
timeout time.Duration

m sync.Mutex
cancelFunc func()
cancelled bool
}

func (ss *deadlinableServerStream) Context() context.Context {
ss.m.Lock()
defer ss.m.Unlock()
if ss.cancelled {
ctx, cancel := context.WithCancel(ss.ServerStream.Context())
cancel()
return ctx
}
ctx, cancel := context.WithTimeout(ss.ServerStream.Context(), ss.timeout)
ss.cancelFunc = cancel
return ctx
}

func (ss *deadlinableServerStream) Cancel() {
ss.m.Lock()
defer ss.m.Unlock()
if ss.cancelFunc != nil {
ss.cancelFunc()
}
ss.cancelled = true
}
2 changes: 2 additions & 0 deletions pkg/lib/registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func newQuerier(t *testing.T, bundles []*model.Bundle) *registry.Querier {
}
reg, err := registry.NewQuerier(pkgs)
require.NoError(t, err)
err = reg.Wait(context.Background())
require.NoError(t, err)
return reg
}

Expand Down
Loading