Skip to content

Commit

Permalink
*: add service subcommand (#5971)
Browse files Browse the repository at this point in the history
ref #5836, ref #5964

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
Co-authored-by: lhy1024 <[email protected]>
  • Loading branch information
3 people authored Feb 15, 2023
1 parent 2d9ff09 commit f1e3bf5
Show file tree
Hide file tree
Showing 14 changed files with 320 additions and 203 deletions.
137 changes: 105 additions & 32 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/spf13/cobra"
"github.com/tikv/pd/pkg/autoscaling"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/dashboard"
"github.com/tikv/pd/pkg/errs"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/swaggerserver"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/metricutil"
Expand All @@ -41,43 +42,83 @@ import (
)

func main() {
ctx, cancel, svr := createServerWrapper(os.Args[1:])

sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

var sig os.Signal
go func() {
sig = <-sc
cancel()
}()
rootCmd := &cobra.Command{
Use: "pd-server",
Short: "Placement Driver server",
Run: createServerWrapper,
}

if err := svr.Run(); err != nil {
log.Fatal("run server failed", errs.ZapError(err))
rootCmd.Flags().BoolP("version", "V", false, "print version information and exit")
rootCmd.Flags().StringP("config", "", "", "config file")
rootCmd.Flags().BoolP("config-check", "", false, "check config file validity and exit")
rootCmd.Flags().StringP("name", "", "", "human-readable name for this pd member")
rootCmd.Flags().StringP("data-dir", "", "", "path to the data directory (default 'default.${name}')")
rootCmd.Flags().StringP("client-urls", "", "http://127.0.0.1:2379", "url for client traffic")
rootCmd.Flags().StringP("advertise-client-urls", "", "", "advertise url for client traffic (default '${client-urls}')")
rootCmd.Flags().StringP("peer-urls", "", "http://127.0.0.1:2379", "url for peer traffic")
rootCmd.Flags().StringP("advertise-peer-urls", "", "", "advertise url for peer traffic (default '${peer-urls}')")
rootCmd.Flags().StringP("initial-cluster", "", "", "initial cluster configuration for bootstrapping, e,g. pd=http://127.0.0.1:2380")
rootCmd.Flags().StringP("join", "", "", "join to an existing cluster (usage: cluster's '${advertise-client-urls}'")
rootCmd.Flags().StringP("metrics-addr", "", "", "prometheus pushgateway address, leaves it empty will disable prometheus push")
rootCmd.Flags().StringP("log-level", "L", "info", "log level: debug, info, warn, error, fatal (default 'info')")
rootCmd.Flags().StringP("log-file", "", "", "log file path")
rootCmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
rootCmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
rootCmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
rootCmd.Flags().BoolP("force-new-cluster", "", false, "force to create a new one-member cluster")
rootCmd.AddCommand(NewServiceCommand())

rootCmd.SetOutput(os.Stdout)
if err := rootCmd.Execute(); err != nil {
rootCmd.Println(err)
os.Exit(1)
}
}

<-ctx.Done()
log.Info("Got signal to exit", zap.String("signal", sig.String()))
// NewServiceCommand returns the service command.
func NewServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "service <tso>",
Short: "Run a service",
}
cmd.AddCommand(NewTSOServiceCommand())
return cmd
}

svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
default:
exit(1)
// NewTSOServiceCommand returns the unsafe remove failed stores command.
func NewTSOServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "tso",
Short: "Run the tso service",
Run: tso.CreateServerWrapper,
}
cmd.Flags().BoolP("version", "V", false, "print version information and exit")
cmd.Flags().StringP("config", "", "", "config file")
cmd.Flags().StringP("backend-endpoints", "", "http://127.0.0.1:2379", "url for etcd client")
cmd.Flags().StringP("listen-addr", "", "", "listen address for tso service")
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
cmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
return cmd
}

func createServerWrapper(args []string) (context.Context, context.CancelFunc, bs.Server) {
func createServerWrapper(cmd *cobra.Command, args []string) {
schedulers.Register()
cfg := config.NewConfig()
err := cfg.Parse(args)
flagSet := cmd.Flags()
flagSet.Parse(args)
err := cfg.Parse(flagSet)
if err != nil {
cmd.Println(err)
return
}

if cfg.Version {
printVersion, err := flagSet.GetBool("version")
if err != nil {
cmd.Println(err)
return
}
if printVersion {
server.PrintPDInfo()
exit(0)
}
Expand All @@ -92,15 +133,21 @@ func createServerWrapper(args []string) (context.Context, context.CancelFunc, bs
log.Fatal("parse cmd flags error", errs.ZapError(err))
}

if cfg.ConfigCheck {
configCheck, err := flagSet.GetBool("config-check")
if err != nil {
cmd.Println(err)
return
}

if configCheck {
server.PrintConfigCheckMsg(cfg)
exit(0)
}

// New zap logger
err = cfg.SetupLogger()
err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err == nil {
log.ReplaceGlobals(cfg.GetZapLogger(), cfg.GetZapLogProperties())
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
} else {
log.Fatal("initialize logger error", errs.ZapError(err))
}
Expand Down Expand Up @@ -132,7 +179,33 @@ func createServerWrapper(args []string) (context.Context, context.CancelFunc, bs
log.Fatal("create server failed", errs.ZapError(err))
}

return ctx, cancel, svr
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

var sig os.Signal
go func() {
sig = <-sc
cancel()
}()

if err := svr.Run(); err != nil {
log.Fatal("run server failed", errs.ZapError(err))
}

<-ctx.Done()
log.Info("Got signal to exit", zap.String("signal", sig.String()))

svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
default:
exit(1)
}
}

func exit(code int) {
Expand Down
75 changes: 58 additions & 17 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@ import (
"flag"
"net/http"
"os"
"os/signal"
"syscall"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/errors"
"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/spf13/cobra"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

// Server is the TSO server, and it implements bs.Server.
Expand Down Expand Up @@ -69,12 +72,24 @@ func (s *Server) GetHTTPClient() *http.Client {
}

// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server
func CreateServerWrapper(args []string) (context.Context, context.CancelFunc, bs.Server) {
func CreateServerWrapper(cmd *cobra.Command, args []string) {
cmd.Flags().Parse(args)
cfg := tso.NewConfig()
err := cfg.Parse(os.Args[1:])
flagSet := cmd.Flags()
err := cfg.Parse(flagSet)
if err != nil {
cmd.Println(err)
return
}

if cfg.Version {
printVersionInfo()
printVersion, err := flagSet.GetBool("version")
if err != nil {
cmd.Println(err)
return
}
if printVersion {
// TODO: support printing TSO server info
// server.PrintTSOInfo()
exit(0)
}

Expand All @@ -88,29 +103,55 @@ func CreateServerWrapper(args []string) (context.Context, context.CancelFunc, bs
log.Fatal("parse cmd flags error", errs.ZapError(err))
}

if cfg.ConfigCheck {
printConfigCheckMsg(cfg)
exit(0)
// New zap logger
err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err == nil {
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
} else {
log.Fatal("initialize logger error", errs.ZapError(err))
}
// Flushing any buffered log entries
defer log.Sync()

// TODO: Initialize logger
// TODO: support printing TSO server info
// LogTSOInfo()

// TODO: Make it configurable if it has big impact on performance.
grpcprometheus.EnableHandlingTimeHistogram()

metricutil.Push(&cfg.Metric)

// TODO: Create the server
ctx, cancel := context.WithCancel(context.Background())
svr := &Server{}

sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

var sig os.Signal
go func() {
sig = <-sc
cancel()
}()

if err := svr.Run(); err != nil {
log.Fatal("run server failed", errs.ZapError(err))
}

return nil, nil, nil
}

// TODO: implement it
func printVersionInfo() {
}
<-ctx.Done()
log.Info("Got signal to exit", zap.String("signal", sig.String()))

// TODO: implement it
func printConfigCheckMsg(cfg *tso.Config) {
svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
default:
exit(1)
}
}

func exit(code int) {
Expand Down
Loading

0 comments on commit f1e3bf5

Please sign in to comment.