diff --git a/cmd/layotto/main.go b/cmd/layotto/main.go index d4ccb41b6d..f885a8b703 100644 --- a/cmd/layotto/main.go +++ b/cmd/layotto/main.go @@ -22,6 +22,8 @@ import ( "strconv" "time" + "mosn.io/mosn/pkg/istio" + "github.com/dapr/components-contrib/secretstores" "github.com/dapr/components-contrib/secretstores/aws/parameterstore" "github.com/dapr/components-contrib/secretstores/aws/secretmanager" @@ -200,6 +202,7 @@ var loggerForDaprComp = logger.NewLogger("reuse.dapr.component") // GitVersion mosn version is specified by latest tag var GitVersion = "" +var IstioVersion = "1.10.6" func init() { mgrpc.RegisterServerHandler("runtime", NewRuntimeGrpcServer) @@ -461,6 +464,8 @@ func registerAppInfo(app *cli.App) { appInfo.Version = app.Version appInfo.Compiled = app.Compiled actuator.SetAppInfoSingleton(appInfo) + // set istio version + istio.IstioVersion = IstioVersion } func newRuntimeApp(startCmd *cli.Command) *cli.App { diff --git a/cmd/layotto_multiple_api/cmd.go b/cmd/layotto_multiple_api/cmd.go new file mode 100644 index 0000000000..39bb6e9f00 --- /dev/null +++ b/cmd/layotto_multiple_api/cmd.go @@ -0,0 +1,302 @@ +package main + +import ( + "os" + "runtime" + "time" + + "mosn.io/api" + "mosn.io/mosn/istio/istio1106" + v2 "mosn.io/mosn/pkg/config/v2" + "mosn.io/mosn/pkg/configmanager" + "mosn.io/mosn/pkg/log" + "mosn.io/mosn/pkg/metrics" + "mosn.io/mosn/pkg/mosn" + "mosn.io/mosn/pkg/protocol" + "mosn.io/mosn/pkg/protocol/xprotocol" + "mosn.io/mosn/pkg/protocol/xprotocol/bolt" + "mosn.io/mosn/pkg/protocol/xprotocol/dubbo" + "mosn.io/mosn/pkg/server" + "mosn.io/mosn/pkg/stagemanager" + xstream "mosn.io/mosn/pkg/stream/xprotocol" + "mosn.io/mosn/pkg/trace" + mosn_jaeger "mosn.io/mosn/pkg/trace/jaeger" + "mosn.io/mosn/pkg/trace/skywalking" + tracehttp "mosn.io/mosn/pkg/trace/sofa/http" + xtrace "mosn.io/mosn/pkg/trace/sofa/xprotocol" + tracebolt "mosn.io/mosn/pkg/trace/sofa/xprotocol/bolt" + mosn_zipkin "mosn.io/mosn/pkg/trace/zipkin" + "mosn.io/pkg/buffer" + + component_actuators "mosn.io/layotto/components/pkg/actuators" + "mosn.io/layotto/diagnostics" + "mosn.io/layotto/diagnostics/jaeger" + lprotocol "mosn.io/layotto/diagnostics/protocol" + lsky "mosn.io/layotto/diagnostics/skywalking" + "mosn.io/layotto/diagnostics/zipkin" + + // Actuator + "mosn.io/layotto/pkg/actuator/health" + "mosn.io/layotto/pkg/integrate/actuator" + + "github.com/urfave/cli" + "mosn.io/mosn/pkg/featuregate" +) + +var ( + flagToMosnLogLevel = map[string]string{ + "trace": "TRACE", + "debug": "DEBUG", + "info": "INFO", + "warning": "WARN", + "error": "ERROR", + "critical": "FATAL", + "off": "OFF", + } + + cmdStart = cli.Command{ + Name: "start", + Usage: "start runtime. For example: ./layotto start -c configs/config_standalone.json", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "config, c", + Usage: "Load configuration from `FILE`", + EnvVar: "RUNTIME_CONFIG", + Value: "configs/config.json", + }, cli.StringFlag{ + Name: "service-cluster, s", + Usage: "sidecar service cluster", + EnvVar: "SERVICE_CLUSTER", + }, cli.StringFlag{ + Name: "service-node, n", + Usage: "sidecar service node", + EnvVar: "SERVICE_NODE", + }, cli.StringFlag{ + Name: "service-type, p", + Usage: "sidecar service type", + EnvVar: "SERVICE_TYPE", + }, cli.StringSliceFlag{ + Name: "service-meta, sm", + Usage: "sidecar service metadata", + EnvVar: "SERVICE_META", + }, cli.StringSliceFlag{ + Name: "service-lables, sl", + Usage: "sidecar service metadata labels", + EnvVar: "SERVICE_LAB", + }, cli.StringSliceFlag{ + Name: "cluster-domain, domain", + Usage: "sidecar service metadata labels", + EnvVar: "CLUSTER_DOMAIN", + }, cli.StringFlag{ + Name: "feature-gates, f", + Usage: "config feature gates", + EnvVar: "FEATURE_GATES", + }, cli.StringFlag{ + Name: "pod-namespace, pns", + Usage: "mosn pod namespaces", + EnvVar: "POD_NAMESPACE", + }, cli.StringFlag{ + Name: "pod-name, pn", + Usage: "mosn pod name", + EnvVar: "POD_NAME", + }, cli.StringFlag{ + Name: "pod-ip, pi", + Usage: "mosn pod ip", + EnvVar: "POD_IP", + }, cli.StringFlag{ + Name: "log-level, l", + Usage: "mosn log level, trace|debug|info|warning|error|critical|off", + EnvVar: "LOG_LEVEL", + }, cli.StringFlag{ + Name: "log-format, lf", + Usage: "mosn log format, currently useless", + }, cli.StringSliceFlag{ + Name: "component-log-level, lc", + Usage: "mosn component format, currently useless", + }, cli.StringFlag{ + Name: "local-address-ip-version", + Usage: "ip version, v4 or v6, currently useless", + }, cli.IntFlag{ + Name: "restart-epoch", + Usage: "epoch to restart, align to Istio startup params, currently useless", + }, cli.IntFlag{ + Name: "drain-time-s", + Usage: "seconds to drain connections, default 600 seconds", + Value: 600, + }, cli.StringFlag{ + Name: "parent-shutdown-time-s", + Usage: "parent shutdown time seconds, align to Istio startup params, currently useless", + }, cli.IntFlag{ + Name: "max-obj-name-len", + Usage: "object name limit, align to Istio startup params, currently useless", + }, cli.IntFlag{ + Name: "concurrency", + Usage: "concurrency, align to Istio startup params, currently useless", + }, cli.IntFlag{ + Name: "log-format-prefix-with-location", + Usage: "log-format-prefix-with-location, align to Istio startup params, currently useless", + }, cli.IntFlag{ + Name: "bootstrap-version", + Usage: "API version to parse the bootstrap config as (e.g. 3). If unset, all known versions will be attempted", + }, cli.StringFlag{ + Name: "drain-strategy", + Usage: "immediate", + }, cli.BoolTFlag{ + Name: "disable-hot-restart", + Usage: "disable-hot-restart", + }, + }, + Action: func(c *cli.Context) error { + app := mosn.NewMosn() + stm := stagemanager.InitStageManager(c, c.String("config"), app) + + // if needs featuregate init in parameter stage or init stage + // append a new stage and called featuregate.ExecuteInitFunc(keys...) + // parameter parsed registered + stm.AppendParamsParsedStage(ExtensionsRegister) + + stm.AppendParamsParsedStage(DefaultParamsParsed) + + // init Stage + stm.AppendInitStage(func(cfg *v2.MOSNConfig) { + drainTime := c.Int("drain-time-s") + server.SetDrainTime(time.Duration(drainTime) * time.Second) + // istio parameters + serviceCluster := c.String("service-cluster") + serviceNode := c.String("service-node") + serviceType := c.String("service-type") + serviceMeta := c.StringSlice("service-meta") + metaLabels := c.StringSlice("service-lables") + clusterDomain := c.String("cluster-domain") + podName := c.String("pod-name") + podNamespace := c.String("pod-namespace") + podIp := c.String("pod-ip") + + if serviceNode != "" { + istio1106.InitXdsInfo(cfg, serviceCluster, serviceNode, serviceMeta, metaLabels) + } else { + if istio1106.IsApplicationNodeType(serviceType) { + sn := podName + "." + podNamespace + serviceNode = serviceType + "~" + podIp + "~" + sn + "~" + clusterDomain + istio1106.InitXdsInfo(cfg, serviceCluster, serviceNode, serviceMeta, metaLabels) + } else { + log.StartLogger.Infof("[mosn] [start] xds service type is not router/sidecar, use config only") + istio1106.InitXdsInfo(cfg, "", "", nil, nil) + } + } + }) + stm.AppendInitStage(mosn.DefaultInitStage) + stm.AppendInitStage(func(_ *v2.MOSNConfig) { + // set version and go version + metrics.SetVersion(GitVersion) + metrics.SetGoVersion(runtime.Version()) + }) + // pre-startup + stm.AppendPreStartStage(mosn.DefaultPreStartStage) // called finally stage by default + // startup + stm.AppendStartStage(mosn.DefaultStartStage) + // after-startup + stm.AppendAfterStartStage(SetActuatorAfterStart) + // execute all stages + stm.RunAll() + return nil + }, + } + + cmdStop = cli.Command{ + Name: "stop", + Usage: "stop mosn proxy", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "config, c", + Usage: "load configuration from `FILE`", + EnvVar: "MOSN_CONFIG", + Value: "configs/mosn_config.json", + }, + }, + Action: func(c *cli.Context) (err error) { + app := mosn.NewMosn() + stm := stagemanager.InitStageManager(c, c.String("config"), app) + stm.AppendInitStage(mosn.InitDefaultPath) + return stm.StopMosnProcess() + }, + } + + cmdReload = cli.Command{ + Name: "reload", + Usage: "reconfiguration", + Action: func(c *cli.Context) error { + return nil + }, + } +) + +func SetActuatorAfterStart(_ stagemanager.Application) { + // register component actuator + component_actuators.RangeAllIndicators( + func(name string, v *component_actuators.ComponentsIndicator) bool { + if v != nil { + health.AddLivenessIndicator(name, v.LivenessIndicator) + health.AddReadinessIndicator(name, v.ReadinessIndicator) + } + return true + }) + // set started + actuator.GetRuntimeReadinessIndicator().SetStarted() + actuator.GetRuntimeLivenessIndicator().SetStarted() +} + +func DefaultParamsParsed(c *cli.Context) { + // log level control + flagLogLevel := c.String("log-level") + if mosnLogLevel, ok := flagToMosnLogLevel[flagLogLevel]; ok { + if mosnLogLevel == "OFF" { + log.GetErrorLoggerManagerInstance().Disable() + } else { + log.GetErrorLoggerManagerInstance().SetLogLevelControl(configmanager.ParseLogLevel(mosnLogLevel)) + } + } + // set feature gates + err := featuregate.Set(c.String("feature-gates")) + if err != nil { + log.StartLogger.Infof("[mosn] [start] parse feature-gates flag fail : %+v", err) + os.Exit(1) + } +} + +// ExtensionsRegister for register mosn rpc extensions +func ExtensionsRegister(_ *cli.Context) { + // 1. tracer driver register + // Q: What is a tracer driver ? + // A: MOSN implement a group of trace drivers, but only a configured driver will be loaded. + // A tracer driver can create different tracer by different protocol. + // When MOSN receive a request stream, MOSN will try to start a tracer according to the request protocol. + // For more details,see https://mosn.io/blog/posts/skywalking-support/ + trace.RegisterDriver("SOFATracer", trace.NewDefaultDriverImpl()) + + // 2. xprotocol action register + // ResgisterXProtocolAction is MOSN's xprotocol framework's extensions. + // when a xprotocol implementation (defined by api.XProtocolCodec) registered, the registered action will be called. + xprotocol.ResgisterXProtocolAction(xstream.NewConnPool, xstream.NewStreamFactory, func(codec api.XProtocolCodec) { + name := codec.ProtocolName() + trace.RegisterTracerBuilder("SOFATracer", name, xtrace.NewTracer) + }) + + // 3. register protocols that are used by layotto. + // RegisterXProtocolCodec add a new xprotocol implementation, which is a wrapper for protocol register + _ = xprotocol.RegisterXProtocolCodec(&bolt.XCodec{}) + _ = xprotocol.RegisterXProtocolCodec(&dubbo.XCodec{}) + + // 4. register tracer + xtrace.RegisterDelegate(bolt.ProtocolName, tracebolt.Boltv1Delegate) + trace.RegisterTracerBuilder("SOFATracer", protocol.HTTP1, tracehttp.NewTracer) + trace.RegisterTracerBuilder("SOFATracer", lprotocol.Layotto, diagnostics.NewTracer) + trace.RegisterTracerBuilder(skywalking.SkyDriverName, lprotocol.Layotto, lsky.NewGrpcSkyTracer) + trace.RegisterTracerBuilder(mosn_jaeger.DriverName, lprotocol.Layotto, jaeger.NewGrpcJaegerTracer) + trace.RegisterTracerBuilder(mosn_zipkin.DriverName, lprotocol.Layotto, zipkin.NewGrpcZipTracer) + + // register buffer logger + buffer.SetLogFunc(func(msg string) { + log.DefaultLogger.Errorf("[iobuffer] iobuffer error log info: %s", msg) + }) +} diff --git a/cmd/layotto_multiple_api/main.go b/cmd/layotto_multiple_api/main.go index 123142cdbd..4191939e37 100644 --- a/cmd/layotto_multiple_api/main.go +++ b/cmd/layotto_multiple_api/main.go @@ -22,6 +22,12 @@ import ( "strconv" "time" + "mosn.io/layotto/cmd/layotto_multiple_api/helloworld/component" + "mosn.io/layotto/components/custom" + "mosn.io/layotto/pkg/grpc/dapr" + + "mosn.io/mosn/pkg/istio" + "github.com/dapr/components-contrib/secretstores" "github.com/dapr/components-contrib/secretstores/aws/parameterstore" "github.com/dapr/components-contrib/secretstores/aws/secretmanager" @@ -31,33 +37,17 @@ import ( sercetstores_kubernetes "github.com/dapr/components-contrib/secretstores/kubernetes" secretstore_env "github.com/dapr/components-contrib/secretstores/local/env" secretstore_file "github.com/dapr/components-contrib/secretstores/local/file" - mosn_jaeger "mosn.io/mosn/pkg/trace/jaeger" - mosn_zipkin "mosn.io/mosn/pkg/trace/zipkin" - secretstores_loader "mosn.io/layotto/pkg/runtime/secretstores" - - "mosn.io/api" - _ "mosn.io/mosn/pkg/filter/stream/grpcmetric" - "mosn.io/mosn/pkg/stagemanager" - "mosn.io/mosn/pkg/trace/skywalking" - - helloworld_api "mosn.io/layotto/cmd/layotto_multiple_api/helloworld" - "mosn.io/layotto/cmd/layotto_multiple_api/helloworld/component" - "mosn.io/layotto/components/custom" - component_actuators "mosn.io/layotto/components/pkg/actuators" - l8_grpc "mosn.io/layotto/pkg/grpc" - "mosn.io/layotto/pkg/grpc/dapr" "mosn.io/layotto/pkg/grpc/default_api" - - mock_state "mosn.io/layotto/pkg/mock/components/state" - _ "mosn.io/layotto/pkg/wasm" + secretstores_loader "mosn.io/layotto/pkg/runtime/secretstores" "mosn.io/layotto/components/file/local" - "mosn.io/layotto/components/file/s3/alicloud" "mosn.io/layotto/components/file/s3/aws" "mosn.io/layotto/components/file/s3/minio" + mock_state "mosn.io/layotto/pkg/mock/components/state" + dbindings "github.com/dapr/components-contrib/bindings" "github.com/dapr/components-contrib/bindings/http" "mosn.io/pkg/log" @@ -127,6 +117,7 @@ import ( lock_consul "mosn.io/layotto/components/lock/consul" lock_etcd "mosn.io/layotto/components/lock/etcd" lock_inmemory "mosn.io/layotto/components/lock/in-memory" + lock_mongo "mosn.io/layotto/components/lock/mongo" lock_redis "mosn.io/layotto/components/lock/redis" lock_zookeeper "mosn.io/layotto/components/lock/zookeeper" runtime_lock "mosn.io/layotto/pkg/runtime/lock" @@ -134,6 +125,7 @@ import ( // Sequencer sequencer_etcd "mosn.io/layotto/components/sequencer/etcd" sequencer_inmemory "mosn.io/layotto/components/sequencer/in-memory" + sequencer_mongo "mosn.io/layotto/components/sequencer/mongo" sequencer_redis "mosn.io/layotto/components/sequencer/redis" sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper" @@ -150,37 +142,64 @@ import ( "github.com/urfave/cli" "google.golang.org/grpc" - "mosn.io/mosn/pkg/featuregate" _ "mosn.io/mosn/pkg/filter/network/grpc" mgrpc "mosn.io/mosn/pkg/filter/network/grpc" _ "mosn.io/mosn/pkg/filter/network/proxy" _ "mosn.io/mosn/pkg/filter/stream/flowcontrol" + _ "mosn.io/mosn/pkg/filter/stream/grpcmetric" _ "mosn.io/mosn/pkg/metrics/sink" _ "mosn.io/mosn/pkg/metrics/sink/prometheus" - "mosn.io/mosn/pkg/mosn" _ "mosn.io/mosn/pkg/network" - "mosn.io/mosn/pkg/protocol" - "mosn.io/mosn/pkg/protocol/xprotocol" - "mosn.io/mosn/pkg/protocol/xprotocol/bolt" - "mosn.io/mosn/pkg/protocol/xprotocol/dubbo" _ "mosn.io/mosn/pkg/stream/http" - xstream "mosn.io/mosn/pkg/stream/xprotocol" - "mosn.io/mosn/pkg/trace" - tracehttp "mosn.io/mosn/pkg/trace/sofa/http" - xtrace "mosn.io/mosn/pkg/trace/sofa/xprotocol" - tracebolt "mosn.io/mosn/pkg/trace/sofa/xprotocol/bolt" _ "mosn.io/mosn/pkg/wasm/runtime/wasmer" _ "mosn.io/pkg/buffer" - "mosn.io/layotto/diagnostics" _ "mosn.io/layotto/pkg/filter/network/tcpcopy" + l8_grpc "mosn.io/layotto/pkg/grpc" "mosn.io/layotto/pkg/runtime" + _ "mosn.io/layotto/pkg/wasm" + + _ "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3" + _ "mosn.io/mosn/istio/istio1106" + _ "mosn.io/mosn/istio/istio1106/filter/stream/jwtauthn" + _ "mosn.io/mosn/istio/istio1106/filter/stream/mixer" + _ "mosn.io/mosn/istio/istio1106/filter/stream/stats" + _ "mosn.io/mosn/istio/istio1106/sds" + _ "mosn.io/mosn/istio/istio1106/xds" + _ "mosn.io/mosn/pkg/filter/listener/originaldst" + _ "mosn.io/mosn/pkg/filter/network/connectionmanager" + _ "mosn.io/mosn/pkg/filter/network/streamproxy" + _ "mosn.io/mosn/pkg/filter/network/tunnel" + _ "mosn.io/mosn/pkg/filter/stream/dsl" + _ "mosn.io/mosn/pkg/filter/stream/dubbo" + _ "mosn.io/mosn/pkg/filter/stream/faultinject" + _ "mosn.io/mosn/pkg/filter/stream/faulttolerance" + _ "mosn.io/mosn/pkg/filter/stream/gzip" + _ "mosn.io/mosn/pkg/filter/stream/headertometadata" + _ "mosn.io/mosn/pkg/filter/stream/ipaccess" + _ "mosn.io/mosn/pkg/filter/stream/mirror" + _ "mosn.io/mosn/pkg/filter/stream/payloadlimit" + _ "mosn.io/mosn/pkg/filter/stream/proxywasm" + _ "mosn.io/mosn/pkg/filter/stream/seata" + _ "mosn.io/mosn/pkg/filter/stream/transcoder/http2bolt" + _ "mosn.io/mosn/pkg/filter/stream/transcoder/httpconv" + _ "mosn.io/mosn/pkg/protocol" + _ "mosn.io/mosn/pkg/protocol/xprotocol" + _ "mosn.io/mosn/pkg/router" + _ "mosn.io/mosn/pkg/server/keeper" + _ "mosn.io/mosn/pkg/stream/http2" + _ "mosn.io/mosn/pkg/stream/xprotocol" + _ "mosn.io/mosn/pkg/trace/jaeger" + _ "mosn.io/mosn/pkg/trace/skywalking" + _ "mosn.io/mosn/pkg/trace/skywalking/http" + _ "mosn.io/mosn/pkg/trace/sofa/http" + _ "mosn.io/mosn/pkg/trace/sofa/xprotocol" + _ "mosn.io/mosn/pkg/trace/sofa/xprotocol/bolt" + _ "mosn.io/mosn/pkg/upstream/healthcheck" + _ "mosn.io/mosn/pkg/upstream/servicediscovery/dubbod" + helloworld_api "mosn.io/layotto/cmd/layotto_multiple_api/helloworld" _ "mosn.io/layotto/diagnostics/exporter_iml" - "mosn.io/layotto/diagnostics/jaeger" - lprotocol "mosn.io/layotto/diagnostics/protocol" - lsky "mosn.io/layotto/diagnostics/skywalking" - "mosn.io/layotto/diagnostics/zipkin" ) // loggerForDaprComp is constructed for reusing dapr's components. @@ -188,6 +207,7 @@ var loggerForDaprComp = logger.NewLogger("reuse.dapr.component") // GitVersion mosn version is specified by latest tag var GitVersion = "" +var IstioVersion = "1.10.6" func init() { mgrpc.RegisterServerHandler("runtime", NewRuntimeGrpcServer) @@ -381,6 +401,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp runtime_lock.NewFactory("consul", func() lock.LockStore { return lock_consul.NewConsulLock(log.DefaultLogger) }), + runtime_lock.NewFactory("mongo", func() lock.LockStore { + return lock_mongo.NewMongoLock(log.DefaultLogger) + }), runtime_lock.NewFactory("in-memory", func() lock.LockStore { return lock_inmemory.NewInMemoryLock() }), @@ -404,6 +427,9 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp runtime_sequencer.NewFactory("zookeeper", func() sequencer.Store { return sequencer_zookeeper.NewZookeeperSequencer(log.DefaultLogger) }), + runtime_sequencer.NewFactory("mongo", func() sequencer.Store { + return sequencer_mongo.NewMongoSequencer(log.DefaultLogger) + }), runtime_sequencer.NewFactory("in-memory", func() sequencer.Store { return sequencer_inmemory.NewInMemorySequencer() }), @@ -434,8 +460,7 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp secretstores_loader.NewFactory("local.env", func() secretstores.SecretStore { return secretstore_env.NewEnvSecretStore(loggerForDaprComp) }), - ), - // Custom components + ), // Custom components runtime.WithCustomComponentFactory("helloworld", custom.NewComponentFactory("in-memory", component.NewInMemoryHelloWorld), custom.NewComponentFactory("goodbye", component.NewSayGoodbyeHelloWorld), @@ -444,98 +469,6 @@ func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrp return server, err } -var cmdStart = cli.Command{ - Name: "start", - Usage: "start runtime. For example: ./layotto start -c configs/config_standalone.json", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "config, c", - Usage: "Load configuration from `FILE`", - EnvVar: "RUNTIME_CONFIG", - Value: "configs/config.json", - }, cli.StringFlag{ - Name: "feature-gates, f", - Usage: "config feature gates", - EnvVar: "FEATURE_GATES", - }, - }, - Action: func(c *cli.Context) error { - app := mosn.NewMosn() - stm := stagemanager.InitStageManager(c, c.String("config"), app) - - stm.AppendParamsParsedStage(ExtensionsRegister) - stm.AppendParamsParsedStage(func(c *cli.Context) { - err := featuregate.Set(c.String("feature-gates")) - if err != nil { - os.Exit(1) - } - }) - - stm.AppendInitStage(mosn.DefaultInitStage) - - stm.AppendPreStartStage(mosn.DefaultPreStartStage) // called finally stage by default - - stm.AppendStartStage(mosn.DefaultStartStage) - - stm.AppendAfterStartStage(SetActuatorAfterStart) - - stm.Run() - - actuator.GetRuntimeReadinessIndicator().SetStarted() - actuator.GetRuntimeLivenessIndicator().SetStarted() - // wait mosn finished - stm.WaitFinish() - return nil - }, -} - -func SetActuatorAfterStart(_ stagemanager.Application) { - // register component actuator - component_actuators.RangeAllIndicators( - func(name string, v *component_actuators.ComponentsIndicator) bool { - if v != nil { - health.AddLivenessIndicator(name, v.LivenessIndicator) - health.AddReadinessIndicator(name, v.ReadinessIndicator) - } - return true - }) - // set started - actuator.GetRuntimeReadinessIndicator().SetStarted() - actuator.GetRuntimeLivenessIndicator().SetStarted() -} - -// ExtensionsRegister for register mosn rpc extensions -func ExtensionsRegister(_ *cli.Context) { - // 1. tracer driver register - // Q: What is a tracer driver ? - // A: MOSN implement a group of trace drivers, but only a configured driver will be loaded. - // A tracer driver can create different tracer by different protocol. - // When MOSN receive a request stream, MOSN will try to start a tracer according to the request protocol - // For more details,see https://mosn.io/blog/posts/skywalking-support/ - trace.RegisterDriver("SOFATracer", trace.NewDefaultDriverImpl()) - - // 2. xprotocol action register - // ResgisterXProtocolAction is MOSN's xprotocol framework's extensions. - // when a xprotocol implementation (defined by api.XProtocolCodec) registered, the registered action will be called. - xprotocol.ResgisterXProtocolAction(xstream.NewConnPool, xstream.NewStreamFactory, func(codec api.XProtocolCodec) { - name := codec.ProtocolName() - trace.RegisterTracerBuilder("SOFATracer", name, xtrace.NewTracer) - }) - - // 3. register protocols that are used by layotto. - // RegisterXProtocolCodec add a new xprotocol implementation, which is a wrapper for protocol register - _ = xprotocol.RegisterXProtocolCodec(&bolt.XCodec{}) - _ = xprotocol.RegisterXProtocolCodec(&dubbo.XCodec{}) - - // 4. register tracer - xtrace.RegisterDelegate(bolt.ProtocolName, tracebolt.Boltv1Delegate) - trace.RegisterTracerBuilder("SOFATracer", protocol.HTTP1, tracehttp.NewTracer) - trace.RegisterTracerBuilder("SOFATracer", "layotto", diagnostics.NewTracer) - trace.RegisterTracerBuilder(skywalking.SkyDriverName, lprotocol.Layotto, lsky.NewGrpcSkyTracer) - trace.RegisterTracerBuilder(mosn_jaeger.DriverName, lprotocol.Layotto, jaeger.NewGrpcJaegerTracer) - trace.RegisterTracerBuilder(mosn_zipkin.DriverName, lprotocol.Layotto, zipkin.NewGrpcZipTracer) -} - func main() { app := newRuntimeApp(&cmdStart) registerAppInfo(app) @@ -548,6 +481,8 @@ func registerAppInfo(app *cli.App) { appInfo.Version = app.Version appInfo.Compiled = app.Compiled actuator.SetAppInfoSingleton(appInfo) + // set istio version + istio.IstioVersion = IstioVersion } func newRuntimeApp(startCmd *cli.Command) *cli.App { @@ -562,6 +497,8 @@ func newRuntimeApp(startCmd *cli.Command) *cli.App { // commands app.Commands = []cli.Command{ cmdStart, + cmdStop, + cmdReload, } // action app.Action = func(c *cli.Context) error { diff --git a/cmd/layotto_without_xds/cmd.go b/cmd/layotto_without_xds/cmd.go new file mode 100644 index 0000000000..0f4fba5a20 --- /dev/null +++ b/cmd/layotto_without_xds/cmd.go @@ -0,0 +1,199 @@ +package main + +import ( + "os" + "runtime" + + "mosn.io/api" + v2 "mosn.io/mosn/pkg/config/v2" + "mosn.io/mosn/pkg/configmanager" + "mosn.io/mosn/pkg/log" + "mosn.io/mosn/pkg/metrics" + "mosn.io/mosn/pkg/mosn" + "mosn.io/mosn/pkg/protocol" + "mosn.io/mosn/pkg/protocol/xprotocol" + "mosn.io/mosn/pkg/protocol/xprotocol/bolt" + "mosn.io/mosn/pkg/protocol/xprotocol/dubbo" + "mosn.io/mosn/pkg/stagemanager" + xstream "mosn.io/mosn/pkg/stream/xprotocol" + "mosn.io/mosn/pkg/trace" + mosn_jaeger "mosn.io/mosn/pkg/trace/jaeger" + "mosn.io/mosn/pkg/trace/skywalking" + tracehttp "mosn.io/mosn/pkg/trace/sofa/http" + xtrace "mosn.io/mosn/pkg/trace/sofa/xprotocol" + tracebolt "mosn.io/mosn/pkg/trace/sofa/xprotocol/bolt" + mosn_zipkin "mosn.io/mosn/pkg/trace/zipkin" + "mosn.io/pkg/buffer" + + component_actuators "mosn.io/layotto/components/pkg/actuators" + "mosn.io/layotto/diagnostics" + "mosn.io/layotto/diagnostics/jaeger" + lprotocol "mosn.io/layotto/diagnostics/protocol" + lsky "mosn.io/layotto/diagnostics/skywalking" + "mosn.io/layotto/diagnostics/zipkin" + + // Actuator + "mosn.io/layotto/pkg/actuator/health" + "mosn.io/layotto/pkg/integrate/actuator" + + "github.com/urfave/cli" + "mosn.io/mosn/pkg/featuregate" +) + +var ( + flagToMosnLogLevel = map[string]string{ + "trace": "TRACE", + "debug": "DEBUG", + "info": "INFO", + "warning": "WARN", + "error": "ERROR", + "critical": "FATAL", + "off": "OFF", + } + + cmdStart = cli.Command{ + Name: "start", + Usage: "start runtime. For example: ./layotto start -c configs/config_standalone.json", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "config, c", + Usage: "Load configuration from `FILE`", + EnvVar: "RUNTIME_CONFIG", + Value: "configs/config.json", + }, cli.StringFlag{ + Name: "feature-gates, f", + Usage: "config feature gates", + EnvVar: "FEATURE_GATES", + }, cli.StringFlag{ + Name: "log-level, l", + Usage: "mosn log level, trace|debug|info|warning|error|critical|off", + EnvVar: "LOG_LEVEL", + }, + }, + Action: func(c *cli.Context) error { + app := mosn.NewMosn() + stm := stagemanager.InitStageManager(c, c.String("config"), app) + + // if needs featuregate init in parameter stage or init stage + // append a new stage and called featuregate.ExecuteInitFunc(keys...) + // parameter parsed registered + stm.AppendParamsParsedStage(ExtensionsRegister) + + stm.AppendParamsParsedStage(DefaultParamsParsed) + + // init Stage + stm.AppendInitStage(mosn.DefaultInitStage) + stm.AppendInitStage(func(_ *v2.MOSNConfig) { + // set version and go version + metrics.SetVersion(GitVersion) + metrics.SetGoVersion(runtime.Version()) + }) + // pre-startup + stm.AppendPreStartStage(mosn.DefaultPreStartStage) // called finally stage by default + // startup + stm.AppendStartStage(mosn.DefaultStartStage) + // after-startup + stm.AppendAfterStartStage(SetActuatorAfterStart) + // execute all stages + stm.RunAll() + return nil + }, + } + + cmdStop = cli.Command{ + Name: "stop", + Usage: "stop mosn proxy", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "config, c", + Usage: "load configuration from `FILE`", + EnvVar: "MOSN_CONFIG", + Value: "configs/mosn_config.json", + }, + }, + Action: func(c *cli.Context) (err error) { + app := mosn.NewMosn() + stm := stagemanager.InitStageManager(c, c.String("config"), app) + stm.AppendInitStage(mosn.InitDefaultPath) + return stm.StopMosnProcess() + }, + } + + cmdReload = cli.Command{ + Name: "reload", + Usage: "reconfiguration", + Action: func(c *cli.Context) error { + return nil + }, + } +) + +func SetActuatorAfterStart(_ stagemanager.Application) { + // register component actuator + component_actuators.RangeAllIndicators( + func(name string, v *component_actuators.ComponentsIndicator) bool { + if v != nil { + health.AddLivenessIndicator(name, v.LivenessIndicator) + health.AddReadinessIndicator(name, v.ReadinessIndicator) + } + return true + }) + // set started + actuator.GetRuntimeReadinessIndicator().SetStarted() + actuator.GetRuntimeLivenessIndicator().SetStarted() +} + +func DefaultParamsParsed(c *cli.Context) { + // log level control + flagLogLevel := c.String("log-level") + if mosnLogLevel, ok := flagToMosnLogLevel[flagLogLevel]; ok { + if mosnLogLevel == "OFF" { + log.GetErrorLoggerManagerInstance().Disable() + } else { + log.GetErrorLoggerManagerInstance().SetLogLevelControl(configmanager.ParseLogLevel(mosnLogLevel)) + } + } + // set feature gates + err := featuregate.Set(c.String("feature-gates")) + if err != nil { + log.StartLogger.Infof("[mosn] [start] parse feature-gates flag fail : %+v", err) + os.Exit(1) + } +} + +// ExtensionsRegister for register mosn rpc extensions +func ExtensionsRegister(_ *cli.Context) { + // 1. tracer driver register + // Q: What is a tracer driver ? + // A: MOSN implement a group of trace drivers, but only a configured driver will be loaded. + // A tracer driver can create different tracer by different protocol. + // When MOSN receive a request stream, MOSN will try to start a tracer according to the request protocol. + // For more details,see https://mosn.io/blog/posts/skywalking-support/ + trace.RegisterDriver("SOFATracer", trace.NewDefaultDriverImpl()) + + // 2. xprotocol action register + // ResgisterXProtocolAction is MOSN's xprotocol framework's extensions. + // when a xprotocol implementation (defined by api.XProtocolCodec) registered, the registered action will be called. + xprotocol.ResgisterXProtocolAction(xstream.NewConnPool, xstream.NewStreamFactory, func(codec api.XProtocolCodec) { + name := codec.ProtocolName() + trace.RegisterTracerBuilder("SOFATracer", name, xtrace.NewTracer) + }) + + // 3. register protocols that are used by layotto. + // RegisterXProtocolCodec add a new xprotocol implementation, which is a wrapper for protocol register + _ = xprotocol.RegisterXProtocolCodec(&bolt.XCodec{}) + _ = xprotocol.RegisterXProtocolCodec(&dubbo.XCodec{}) + + // 4. register tracer + xtrace.RegisterDelegate(bolt.ProtocolName, tracebolt.Boltv1Delegate) + trace.RegisterTracerBuilder("SOFATracer", protocol.HTTP1, tracehttp.NewTracer) + trace.RegisterTracerBuilder("SOFATracer", lprotocol.Layotto, diagnostics.NewTracer) + trace.RegisterTracerBuilder(skywalking.SkyDriverName, lprotocol.Layotto, lsky.NewGrpcSkyTracer) + trace.RegisterTracerBuilder(mosn_jaeger.DriverName, lprotocol.Layotto, jaeger.NewGrpcJaegerTracer) + trace.RegisterTracerBuilder(mosn_zipkin.DriverName, lprotocol.Layotto, zipkin.NewGrpcZipTracer) + + // register buffer logger + buffer.SetLogFunc(func(msg string) { + log.DefaultLogger.Errorf("[iobuffer] iobuffer error log info: %s", msg) + }) +} diff --git a/cmd/layotto_without_xds/main.go b/cmd/layotto_without_xds/main.go new file mode 100644 index 0000000000..f836efa81e --- /dev/null +++ b/cmd/layotto_without_xds/main.go @@ -0,0 +1,484 @@ +/* + * Copyright 2021 Layotto Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "encoding/json" + "os" + "strconv" + "time" + + "github.com/dapr/components-contrib/secretstores" + "github.com/dapr/components-contrib/secretstores/aws/parameterstore" + "github.com/dapr/components-contrib/secretstores/aws/secretmanager" + "github.com/dapr/components-contrib/secretstores/azure/keyvault" + gcp_secretmanager "github.com/dapr/components-contrib/secretstores/gcp/secretmanager" + "github.com/dapr/components-contrib/secretstores/hashicorp/vault" + sercetstores_kubernetes "github.com/dapr/components-contrib/secretstores/kubernetes" + secretstore_env "github.com/dapr/components-contrib/secretstores/local/env" + secretstore_file "github.com/dapr/components-contrib/secretstores/local/file" + + "mosn.io/layotto/pkg/grpc/default_api" + secretstores_loader "mosn.io/layotto/pkg/runtime/secretstores" + + "mosn.io/layotto/components/file/local" + "mosn.io/layotto/components/file/s3/alicloud" + "mosn.io/layotto/components/file/s3/aws" + "mosn.io/layotto/components/file/s3/minio" + + mock_state "mosn.io/layotto/pkg/mock/components/state" + + dbindings "github.com/dapr/components-contrib/bindings" + "github.com/dapr/components-contrib/bindings/http" + "mosn.io/pkg/log" + + "mosn.io/layotto/components/configstores/etcdv3" + "mosn.io/layotto/components/file" + "mosn.io/layotto/components/sequencer" + "mosn.io/layotto/pkg/runtime/bindings" + runtime_sequencer "mosn.io/layotto/pkg/runtime/sequencer" + + // Hello + "mosn.io/layotto/components/hello" + "mosn.io/layotto/components/hello/helloworld" + + // Configuration + "mosn.io/layotto/components/configstores" + "mosn.io/layotto/components/configstores/apollo" + + // Pub/Sub + dapr_comp_pubsub "github.com/dapr/components-contrib/pubsub" + pubsub_snssqs "github.com/dapr/components-contrib/pubsub/aws/snssqs" + pubsub_eventhubs "github.com/dapr/components-contrib/pubsub/azure/eventhubs" + "github.com/dapr/components-contrib/pubsub/azure/servicebus" + pubsub_gcp "github.com/dapr/components-contrib/pubsub/gcp/pubsub" + pubsub_hazelcast "github.com/dapr/components-contrib/pubsub/hazelcast" + pubsub_inmemory "github.com/dapr/components-contrib/pubsub/in-memory" + pubsub_kafka "github.com/dapr/components-contrib/pubsub/kafka" + pubsub_mqtt "github.com/dapr/components-contrib/pubsub/mqtt" + "github.com/dapr/components-contrib/pubsub/natsstreaming" + pubsub_pulsar "github.com/dapr/components-contrib/pubsub/pulsar" + "github.com/dapr/components-contrib/pubsub/rabbitmq" + pubsub_redis "github.com/dapr/components-contrib/pubsub/redis" + "github.com/dapr/kit/logger" + + "mosn.io/layotto/pkg/runtime/pubsub" + + // RPC + "mosn.io/layotto/components/rpc" + mosninvoker "mosn.io/layotto/components/rpc/invoker/mosn" + + // State Stores + "github.com/dapr/components-contrib/state" + "github.com/dapr/components-contrib/state/aerospike" + state_dynamodb "github.com/dapr/components-contrib/state/aws/dynamodb" + state_azure_blobstorage "github.com/dapr/components-contrib/state/azure/blobstorage" + state_cosmosdb "github.com/dapr/components-contrib/state/azure/cosmosdb" + state_azure_tablestorage "github.com/dapr/components-contrib/state/azure/tablestorage" + "github.com/dapr/components-contrib/state/cassandra" + "github.com/dapr/components-contrib/state/cloudstate" + "github.com/dapr/components-contrib/state/couchbase" + "github.com/dapr/components-contrib/state/gcp/firestore" + "github.com/dapr/components-contrib/state/hashicorp/consul" + "github.com/dapr/components-contrib/state/hazelcast" + "github.com/dapr/components-contrib/state/memcached" + "github.com/dapr/components-contrib/state/mongodb" + state_mysql "github.com/dapr/components-contrib/state/mysql" + "github.com/dapr/components-contrib/state/postgresql" + state_redis "github.com/dapr/components-contrib/state/redis" + "github.com/dapr/components-contrib/state/rethinkdb" + "github.com/dapr/components-contrib/state/sqlserver" + "github.com/dapr/components-contrib/state/zookeeper" + + runtime_state "mosn.io/layotto/pkg/runtime/state" + + // Lock + "mosn.io/layotto/components/lock" + lock_consul "mosn.io/layotto/components/lock/consul" + lock_etcd "mosn.io/layotto/components/lock/etcd" + lock_inmemory "mosn.io/layotto/components/lock/in-memory" + lock_mongo "mosn.io/layotto/components/lock/mongo" + lock_redis "mosn.io/layotto/components/lock/redis" + lock_zookeeper "mosn.io/layotto/components/lock/zookeeper" + runtime_lock "mosn.io/layotto/pkg/runtime/lock" + + // Sequencer + sequencer_etcd "mosn.io/layotto/components/sequencer/etcd" + sequencer_inmemory "mosn.io/layotto/components/sequencer/in-memory" + sequencer_mongo "mosn.io/layotto/components/sequencer/mongo" + sequencer_redis "mosn.io/layotto/components/sequencer/redis" + sequencer_zookeeper "mosn.io/layotto/components/sequencer/zookeeper" + + // File + "mosn.io/layotto/components/file/s3/qiniu" + "mosn.io/layotto/components/file/s3/tencentcloud" + + // Actuator + _ "mosn.io/layotto/pkg/actuator" + "mosn.io/layotto/pkg/actuator/health" + actuatorInfo "mosn.io/layotto/pkg/actuator/info" + _ "mosn.io/layotto/pkg/filter/stream/actuator/http" + "mosn.io/layotto/pkg/integrate/actuator" + + "github.com/urfave/cli" + "google.golang.org/grpc" + _ "mosn.io/mosn/pkg/filter/network/grpc" + mgrpc "mosn.io/mosn/pkg/filter/network/grpc" + _ "mosn.io/mosn/pkg/filter/network/proxy" + _ "mosn.io/mosn/pkg/filter/stream/flowcontrol" + _ "mosn.io/mosn/pkg/filter/stream/grpcmetric" + _ "mosn.io/mosn/pkg/metrics/sink" + _ "mosn.io/mosn/pkg/metrics/sink/prometheus" + _ "mosn.io/mosn/pkg/network" + _ "mosn.io/mosn/pkg/stream/http" + _ "mosn.io/mosn/pkg/wasm/runtime/wasmer" + _ "mosn.io/pkg/buffer" + + _ "mosn.io/layotto/pkg/filter/network/tcpcopy" + l8_grpc "mosn.io/layotto/pkg/grpc" + "mosn.io/layotto/pkg/runtime" + _ "mosn.io/layotto/pkg/wasm" + + _ "mosn.io/mosn/pkg/filter/listener/originaldst" + _ "mosn.io/mosn/pkg/filter/network/connectionmanager" + _ "mosn.io/mosn/pkg/filter/network/streamproxy" + _ "mosn.io/mosn/pkg/filter/network/tunnel" + _ "mosn.io/mosn/pkg/filter/stream/dsl" + _ "mosn.io/mosn/pkg/filter/stream/dubbo" + _ "mosn.io/mosn/pkg/filter/stream/faultinject" + _ "mosn.io/mosn/pkg/filter/stream/faulttolerance" + _ "mosn.io/mosn/pkg/filter/stream/gzip" + _ "mosn.io/mosn/pkg/filter/stream/headertometadata" + _ "mosn.io/mosn/pkg/filter/stream/ipaccess" + _ "mosn.io/mosn/pkg/filter/stream/mirror" + _ "mosn.io/mosn/pkg/filter/stream/payloadlimit" + _ "mosn.io/mosn/pkg/filter/stream/proxywasm" + _ "mosn.io/mosn/pkg/filter/stream/seata" + _ "mosn.io/mosn/pkg/filter/stream/transcoder/http2bolt" + _ "mosn.io/mosn/pkg/filter/stream/transcoder/httpconv" + _ "mosn.io/mosn/pkg/protocol" + _ "mosn.io/mosn/pkg/protocol/xprotocol" + _ "mosn.io/mosn/pkg/router" + _ "mosn.io/mosn/pkg/server/keeper" + _ "mosn.io/mosn/pkg/stream/http2" + _ "mosn.io/mosn/pkg/stream/xprotocol" + _ "mosn.io/mosn/pkg/trace/jaeger" + _ "mosn.io/mosn/pkg/trace/skywalking" + _ "mosn.io/mosn/pkg/trace/skywalking/http" + _ "mosn.io/mosn/pkg/trace/sofa/http" + _ "mosn.io/mosn/pkg/trace/sofa/xprotocol" + _ "mosn.io/mosn/pkg/trace/sofa/xprotocol/bolt" + _ "mosn.io/mosn/pkg/upstream/healthcheck" + _ "mosn.io/mosn/pkg/upstream/servicediscovery/dubbod" + + _ "mosn.io/layotto/diagnostics/exporter_iml" +) + +// loggerForDaprComp is constructed for reusing dapr's components. +var loggerForDaprComp = logger.NewLogger("reuse.dapr.component") + +// GitVersion mosn version is specified by latest tag +var GitVersion = "" + +func init() { + mgrpc.RegisterServerHandler("runtime", NewRuntimeGrpcServer) + // Register default actuator implementations + actuatorInfo.AddInfoContributor("app", actuator.GetAppContributor()) + health.AddReadinessIndicator("runtime_startup", actuator.GetRuntimeReadinessIndicator()) + health.AddLivenessIndicator("runtime_startup", actuator.GetRuntimeLivenessIndicator()) +} + +func NewRuntimeGrpcServer(data json.RawMessage, opts ...grpc.ServerOption) (mgrpc.RegisteredServer, error) { + var err error + defer func() { + if err != nil { + // fail fast if error occurs during startup. + // The reason we panic in a new goroutine is to prevent mosn from recovering. + go func() { + log.DefaultLogger.Errorf("An error occurred during startup : %v", err) + panic(err) + }() + } + }() + // 1. parse config + cfg, err := runtime.ParseRuntimeConfig(data) + if err != nil { + return nil, err + } + // 2. new instance + rt := runtime.NewMosnRuntime(cfg) + rt.AppendInitRuntimeStage(runtime.DefaultInitRuntimeStage) + // 3. run + server, err := rt.Run( + runtime.WithGrpcOptions(opts...), + // wrap the grpc server with actuator + runtime.WithNewServer(func(apis []l8_grpc.GrpcAPI, opts ...grpc.ServerOption) (mgrpc.RegisteredServer, error) { + server, err := l8_grpc.NewDefaultServer(apis, opts...) + if err != nil { + return nil, err + } + return actuator.NewGrpcServerWithActuator(server) + }), + // register your gRPC API here + runtime.WithGrpcAPI( + default_api.NewGrpcAPI, + ), + // Hello + runtime.WithHelloFactory( + hello.NewHelloFactory("helloworld", helloworld.NewHelloWorld), + ), + // Configuration + runtime.WithConfigStoresFactory( + configstores.NewStoreFactory("apollo", apollo.NewStore), + configstores.NewStoreFactory("etcd", etcdv3.NewStore), + ), + + // RPC + runtime.WithRpcFactory( + rpc.NewRpcFactory("mosn", mosninvoker.NewMosnInvoker), + ), + + // File + runtime.WithFileFactory( + file.NewFileFactory("aliOSS", alicloud.NewAliCloudOSS), + file.NewFileFactory("minioOSS", minio.NewMinioOss), + file.NewFileFactory("awsOSS", aws.NewAwsOss), + file.NewFileFactory("tencentCloudOSS", tencentcloud.NewTencentCloudOSS), + file.NewFileFactory("local", local.NewLocalStore), + file.NewFileFactory("qiniuOSS", qiniu.NewQiniuOSS), + ), + + // PubSub + runtime.WithPubSubFactory( + pubsub.NewFactory("redis", func() dapr_comp_pubsub.PubSub { + return pubsub_redis.NewRedisStreams(loggerForDaprComp) + }), + pubsub.NewFactory("natsstreaming", func() dapr_comp_pubsub.PubSub { + return natsstreaming.NewNATSStreamingPubSub(loggerForDaprComp) + }), + pubsub.NewFactory("azure.eventhubs", func() dapr_comp_pubsub.PubSub { + return pubsub_eventhubs.NewAzureEventHubs(loggerForDaprComp) + }), + pubsub.NewFactory("azure.servicebus", func() dapr_comp_pubsub.PubSub { + return servicebus.NewAzureServiceBus(loggerForDaprComp) + }), + pubsub.NewFactory("rabbitmq", func() dapr_comp_pubsub.PubSub { + return rabbitmq.NewRabbitMQ(loggerForDaprComp) + }), + pubsub.NewFactory("hazelcast", func() dapr_comp_pubsub.PubSub { + return pubsub_hazelcast.NewHazelcastPubSub(loggerForDaprComp) + }), + pubsub.NewFactory("gcp.pubsub", func() dapr_comp_pubsub.PubSub { + return pubsub_gcp.NewGCPPubSub(loggerForDaprComp) + }), + pubsub.NewFactory("kafka", func() dapr_comp_pubsub.PubSub { + return pubsub_kafka.NewKafka(loggerForDaprComp) + }), + pubsub.NewFactory("snssqs", func() dapr_comp_pubsub.PubSub { + return pubsub_snssqs.NewSnsSqs(loggerForDaprComp) + }), + pubsub.NewFactory("mqtt", func() dapr_comp_pubsub.PubSub { + return pubsub_mqtt.NewMQTTPubSub(loggerForDaprComp) + }), + pubsub.NewFactory("pulsar", func() dapr_comp_pubsub.PubSub { + return pubsub_pulsar.NewPulsar(loggerForDaprComp) + }), + pubsub.NewFactory("in-memory", func() dapr_comp_pubsub.PubSub { + return pubsub_inmemory.New(loggerForDaprComp) + }), + ), + // State + runtime.WithStateFactory( + runtime_state.NewFactory("in-memory", func() state.Store { + return mock_state.New(loggerForDaprComp) + }), + runtime_state.NewFactory("redis", func() state.Store { + return state_redis.NewRedisStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("consul", func() state.Store { + return consul.NewConsulStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("azure.blobstorage", func() state.Store { + return state_azure_blobstorage.NewAzureBlobStorageStore(loggerForDaprComp) + }), + runtime_state.NewFactory("azure.cosmosdb", func() state.Store { + return state_cosmosdb.NewCosmosDBStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("azure.tablestorage", func() state.Store { + return state_azure_tablestorage.NewAzureTablesStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("cassandra", func() state.Store { + return cassandra.NewCassandraStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("memcached", func() state.Store { + return memcached.NewMemCacheStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("mongodb", func() state.Store { + return mongodb.NewMongoDB(loggerForDaprComp) + }), + runtime_state.NewFactory("zookeeper", func() state.Store { + return zookeeper.NewZookeeperStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("gcp.firestore", func() state.Store { + return firestore.NewFirestoreStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("postgresql", func() state.Store { + return postgresql.NewPostgreSQLStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("sqlserver", func() state.Store { + return sqlserver.NewSQLServerStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("hazelcast", func() state.Store { + return hazelcast.NewHazelcastStore(loggerForDaprComp) + }), + runtime_state.NewFactory("cloudstate.crdt", func() state.Store { + return cloudstate.NewCRDT(loggerForDaprComp) + }), + runtime_state.NewFactory("couchbase", func() state.Store { + return couchbase.NewCouchbaseStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("aerospike", func() state.Store { + return aerospike.NewAerospikeStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("rethinkdb", func() state.Store { + return rethinkdb.NewRethinkDBStateStore(loggerForDaprComp) + }), + runtime_state.NewFactory("aws.dynamodb", state_dynamodb.NewDynamoDBStateStore), + runtime_state.NewFactory("mysql", func() state.Store { + return state_mysql.NewMySQLStateStore(loggerForDaprComp) + }), + ), + // Lock + runtime.WithLockFactory( + runtime_lock.NewFactory("redis_cluster", func() lock.LockStore { + return lock_redis.NewClusterRedisLock(log.DefaultLogger) + }), + runtime_lock.NewFactory("redis", func() lock.LockStore { + return lock_redis.NewStandaloneRedisLock(log.DefaultLogger) + }), + runtime_lock.NewFactory("zookeeper", func() lock.LockStore { + return lock_zookeeper.NewZookeeperLock(log.DefaultLogger) + }), + runtime_lock.NewFactory("etcd", func() lock.LockStore { + return lock_etcd.NewEtcdLock(log.DefaultLogger) + }), + runtime_lock.NewFactory("consul", func() lock.LockStore { + return lock_consul.NewConsulLock(log.DefaultLogger) + }), + runtime_lock.NewFactory("mongo", func() lock.LockStore { + return lock_mongo.NewMongoLock(log.DefaultLogger) + }), + runtime_lock.NewFactory("in-memory", func() lock.LockStore { + return lock_inmemory.NewInMemoryLock() + }), + ), + + // bindings + runtime.WithOutputBindings( + bindings.NewOutputBindingFactory("http", func() dbindings.OutputBinding { + return http.NewHTTP(loggerForDaprComp) + }), + ), + + // Sequencer + runtime.WithSequencerFactory( + runtime_sequencer.NewFactory("etcd", func() sequencer.Store { + return sequencer_etcd.NewEtcdSequencer(log.DefaultLogger) + }), + runtime_sequencer.NewFactory("redis", func() sequencer.Store { + return sequencer_redis.NewStandaloneRedisSequencer(log.DefaultLogger) + }), + runtime_sequencer.NewFactory("zookeeper", func() sequencer.Store { + return sequencer_zookeeper.NewZookeeperSequencer(log.DefaultLogger) + }), + runtime_sequencer.NewFactory("mongo", func() sequencer.Store { + return sequencer_mongo.NewMongoSequencer(log.DefaultLogger) + }), + runtime_sequencer.NewFactory("in-memory", func() sequencer.Store { + return sequencer_inmemory.NewInMemorySequencer() + }), + ), + // secretstores + runtime.WithSecretStoresFactory( + secretstores_loader.NewFactory("kubernetes", func() secretstores.SecretStore { + return sercetstores_kubernetes.NewKubernetesSecretStore(loggerForDaprComp) + }), + secretstores_loader.NewFactory("azure.keyvault", func() secretstores.SecretStore { + return keyvault.NewAzureKeyvaultSecretStore(loggerForDaprComp) + }), + secretstores_loader.NewFactory("hashicorp.vault", func() secretstores.SecretStore { + return vault.NewHashiCorpVaultSecretStore(loggerForDaprComp) + }), + secretstores_loader.NewFactory("aws.secretmanager", func() secretstores.SecretStore { + return secretmanager.NewSecretManager(loggerForDaprComp) + }), + secretstores_loader.NewFactory("aws.parameterstore", func() secretstores.SecretStore { + return parameterstore.NewParameterStore(loggerForDaprComp) + }), + secretstores_loader.NewFactory("gcp.secretmanager", func() secretstores.SecretStore { + return gcp_secretmanager.NewSecreteManager(loggerForDaprComp) + }), + secretstores_loader.NewFactory("local.file", func() secretstores.SecretStore { + return secretstore_file.NewLocalSecretStore(loggerForDaprComp) + }), + secretstores_loader.NewFactory("local.env", func() secretstores.SecretStore { + return secretstore_env.NewEnvSecretStore(loggerForDaprComp) + }), + )) + return server, err +} + +func main() { + app := newRuntimeApp(&cmdStart) + registerAppInfo(app) + _ = app.Run(os.Args) +} + +func registerAppInfo(app *cli.App) { + appInfo := actuator.NewAppInfo() + appInfo.Name = app.Name + appInfo.Version = app.Version + appInfo.Compiled = app.Compiled + actuator.SetAppInfoSingleton(appInfo) +} + +func newRuntimeApp(startCmd *cli.Command) *cli.App { + app := cli.NewApp() + app.Name = "Layotto" + app.Version = GitVersion + app.Compiled = time.Now() + app.Copyright = "(c) " + strconv.Itoa(time.Now().Year()) + " Layotto Authors" + app.Usage = "A fast and efficient cloud native application runtime based on MOSN." + app.Flags = cmdStart.Flags + + // commands + app.Commands = []cli.Command{ + cmdStart, + cmdStop, + cmdReload, + } + // action + app.Action = func(c *cli.Context) error { + if c.NumFlags() == 0 { + return cli.ShowAppHelp(c) + } + + return startCmd.Action.(func(c *cli.Context) error)(c) + } + + return app +}