forked from gotomicro/ego
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ego_function.go
320 lines (290 loc) · 9.2 KB
/
ego_function.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
package ego
import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"runtime"
"syscall"
sentinelmetrics "github.com/alibaba/sentinel-golang/metrics"
"github.com/gotomicro/ego/core/constant"
"github.com/gotomicro/ego/core/eapp"
"github.com/gotomicro/ego/core/econf"
"github.com/gotomicro/ego/core/econf/manager"
"github.com/gotomicro/ego/core/eflag"
"github.com/gotomicro/ego/core/elog"
"github.com/gotomicro/ego/core/esentinel"
"github.com/gotomicro/ego/core/etrace"
"github.com/gotomicro/ego/core/etrace/otel"
"github.com/gotomicro/ego/core/util/xcolor"
"github.com/gotomicro/ego/internal/retry"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/automaxprocs/maxprocs"
"golang.org/x/sync/errgroup"
)
// waitSignals wait signal
func (e *Ego) waitSignals() {
sig := make(chan os.Signal, 2)
signal.Notify(
sig,
e.opts.shutdownSignals...,
)
go func() {
s := <-sig
// 区分强制退出、优雅退出
grace := s != syscall.SIGQUIT
go func() {
// todo 父节点传context待考虑
stopCtx, cancel := context.WithTimeout(context.Background(), e.opts.stopTimeout)
defer func() {
signal.Stop(sig)
cancel()
}()
_ = e.Stop(stopCtx, grace)
<-stopCtx.Done()
// 记录服务器关闭时候,由于关闭过慢,无法正常关闭,被强制cancel
if errors.Is(stopCtx.Err(), context.DeadlineExceeded) {
elog.Error("waitSignals stop context err", elog.FieldErr(stopCtx.Err()))
}
}()
<-sig
elog.Error("waitSignals quit")
// 因为os.Signal长度为2,那么这里会阻塞住,如果发送两次信号量,强制退出
os.Exit(128 + int(s.(syscall.Signal))) // second signal. Exit directly.
}()
}
func (e *Ego) startServers(ctx context.Context) error {
// start multi servers
for _, s := range e.servers {
s := s
e.cycle.Run(func() (err error) {
_ = s.Init()
err = e.registerer.RegisterService(ctx, s.Info())
if err != nil {
e.logger.Error("register service err", elog.FieldComponent(s.PackageName()), elog.FieldComponentName(s.Name()), elog.FieldErr(err))
}
defer func() {
_ = e.registerer.UnregisterService(ctx, s.Info())
}()
e.logger.Info("start server", elog.FieldComponent(s.PackageName()), elog.FieldComponentName(s.Name()), elog.FieldAddr(s.Info().Label()))
defer e.logger.Info("stop server", elog.FieldComponent(s.PackageName()), elog.FieldComponentName(s.Name()), elog.FieldErr(err), elog.FieldAddr(s.Info().Label()))
err = s.Start()
return
})
}
return nil
}
func (e *Ego) startOrderServers(ctx context.Context) (err error, isNeedStop bool) {
// start order servers
for _, s := range e.orderServers {
s := s
_ = s.Prepare()
// 如果存在短时任务,那么只执行短时任务
// 说明job在前面执行
// 如果job执行完后,下面的操作需要stop
if len(e.jobs) > 0 {
return e.startJobs(), true
}
_ = s.Init()
e.cycle.Run(func() (err error) {
err = e.registerer.RegisterService(ctx, s.Info())
if err != nil {
e.logger.Error("register service err", elog.FieldComponent(s.PackageName()), elog.FieldComponentName(s.Name()), elog.FieldErr(err))
}
defer func() {
_ = e.registerer.UnregisterService(ctx, s.Info())
}()
e.logger.Info("start order server", elog.FieldComponent(s.PackageName()), elog.FieldComponentName(s.Name()), elog.FieldAddr(s.Info().Label()))
defer e.logger.Info("stop order server", elog.FieldComponent(s.PackageName()), elog.FieldComponentName(s.Name()), elog.FieldErr(err), elog.FieldAddr(s.Info().Label()))
err = s.Start()
return
})
isHealth := false
for r := retry.Begin(); r.Continue(ctx); {
// 检测server的health接口
// 如果成功,那么就跳出循环
if s.Health() {
isHealth = true
break
}
}
if !isHealth {
return fmt.Errorf("start order server fail,err: " + s.Name()), true
}
}
return nil, false
}
func (e *Ego) startCrons() error {
for _, w := range e.crons {
w := w
e.cycle.Run(func() error {
return w.Start()
})
}
return nil
}
// todo handle error
func (e *Ego) startJobs() error {
if len(e.jobs) == 0 {
return nil
}
var jobs = make([]func() error, 0)
// wrap jobs
for _, runner := range e.jobs {
runner := runner
jobs = append(jobs, func() error {
return runner.Start()
})
}
eg := errgroup.Group{}
for _, fn := range jobs {
eg.Go(fn)
}
return eg.Wait()
}
// parseFlags init
func (e *Ego) parseFlags() error {
if !e.opts.disableFlagConfig {
eflag.Register(&eflag.StringFlag{
Name: "config",
Usage: "--config",
EnvVar: constant.EgoConfigPath,
Default: constant.DefaultConfig,
Action: func(name string, fs *eflag.FlagSet) {},
})
}
eflag.Register(&eflag.BoolFlag{
Name: "watch",
Usage: "--watch, watch config change event",
Default: true,
EnvVar: "CONFIG_WATCH",
})
eflag.Register(&eflag.BoolFlag{
Name: "version",
Usage: "--version, print version",
Default: false,
Action: func(string, *eflag.FlagSet) {
eapp.PrintVersion()
os.Exit(0)
},
})
eflag.Register(&eflag.StringFlag{
Name: "host",
Usage: "--host, print host",
EnvVar: constant.EnvAppHost,
Default: "0.0.0.0",
Action: func(string, *eflag.FlagSet) {},
})
return eflag.ParseWithArgs(e.opts.arguments)
}
// loadConfig init
func loadConfig() error {
var configAddr = eflag.String("config")
provider, parser, tag, err := manager.NewDataSource(configAddr, eflag.Bool("watch"))
// 如果不存在配置,找不到该文件路径,该错误只存在file类型
if err == manager.ErrDefaultConfigNotExist {
// 如果协议是file类型,并且是默认文件配置,那么判断下文件是否存在,如果不存在只告诉warning,什么都不做
elog.EgoLogger.Warn("no config... ", elog.FieldComponent(econf.PackageName), elog.String("addr", configAddr), elog.FieldErr(err))
return nil
}
// 如果存在错误,报错
if err != nil {
elog.EgoLogger.Panic("data source: provider error", elog.FieldComponent(econf.PackageName), elog.FieldErr(err))
}
// 如果不是,就要加载文件,加载不到panic
if err := econf.LoadFromDataSource(provider, parser, econf.WithTagName(tag)); err != nil {
elog.EgoLogger.Panic("data source: load config", elog.FieldComponent(econf.PackageName), elog.FieldErrKind("unmarshal config err"), elog.FieldErr(err))
}
elog.EgoLogger.Info("init config", elog.FieldComponent(econf.PackageName), elog.String("addr", configAddr))
return nil
}
// initLogger init application and Ego logger
func (e *Ego) initLogger() error {
if econf.Get(e.opts.configPrefix+"logger.default") != nil {
elog.DefaultLogger = elog.Load(e.opts.configPrefix + "logger.default").Build(elog.WithCallSkip(2)) // DefaultLogger 默认为2层
elog.EgoLogger.Info("reinit default logger", elog.FieldComponent(elog.PackageName))
e.opts.afterStopClean = append(e.opts.afterStopClean, elog.DefaultLogger.Flush)
}
if econf.Get(e.opts.configPrefix+"logger.ego") != nil {
elog.EgoLogger = elog.Load(e.opts.configPrefix + "logger.ego").Build(elog.WithDefaultFileName(elog.EgoLoggerName))
elog.EgoLogger.Info("reinit ego logger", elog.FieldComponent(elog.PackageName))
e.opts.afterStopClean = append(e.opts.afterStopClean, elog.EgoLogger.Flush)
}
return nil
}
// initTracer init global tracer
func (e *Ego) initTracer() error {
var (
container *otel.Config
)
if econf.Get(e.opts.configPrefix+"trace") != nil {
container = otel.Load(e.opts.configPrefix + "trace")
} else {
// 设置默认trace
container = otel.DefaultConfig()
}
// 禁用trace
if econf.GetBool(e.opts.configPrefix + "trace.disable") {
elog.EgoLogger.Info("disable trace", elog.FieldComponent("app"))
return nil
}
tracer := container.Build()
etrace.SetGlobalTracer(tracer)
e.opts.afterStopClean = append(e.opts.afterStopClean, container.Stop)
elog.EgoLogger.Info("init trace", elog.FieldComponent("app"))
return nil
}
// initSentinel 启动sentinel
func (e *Ego) initSentinel() error {
if econf.Get(e.opts.configPrefix+"sentinel") != nil {
esentinel.Load(e.opts.configPrefix + "sentinel").Build()
sentinelmetrics.RegisterSentinelMetrics(prometheus.DefaultRegisterer.(*prometheus.Registry))
}
return nil
}
// initMaxProcs init
func initMaxProcs() error {
if maxProcs := econf.GetInt("ego.maxProc"); maxProcs != 0 {
runtime.GOMAXPROCS(maxProcs)
} else {
if _, err := maxprocs.Set(); err != nil {
elog.EgoLogger.Error("init max procs", elog.FieldComponent("app"), elog.FieldErr(err))
}
}
elog.EgoLogger.Info("init app", elog.FieldComponent("app"), elog.Int("pid", os.Getpid()), elog.Int("coreNum", runtime.GOMAXPROCS(-1)))
return nil
}
// printBanner init
func (e *Ego) printBanner() error {
if e.opts.disableBanner {
return nil
}
const banner = `
_/_/_/_/ _/_/_/ _/_/
_/ _/ _/ _/
_/_/_/ _/ _/_/ _/ _/
_/ _/ _/ _/ _/
_/_/_/_/ _/_/_/ _/_/
Welcome to Ego, starting application ...
`
fmt.Println(xcolor.Blue(banner))
return nil
}
func runSerialFuncReturnError(fns []func() error) error {
for _, fn := range fns {
err := fn()
if err != nil {
return err
}
}
return nil
}
func runSerialFuncLogError(fns []func() error) {
for _, clean := range fns {
err := clean()
if err != nil {
elog.EgoLogger.Error("beforeStopClean err", elog.FieldComponent("app"), elog.FieldErr(err))
}
}
}