Skip to content

Commit 04453c2

Browse files
authored
Merge pull request #974 from c9s/refactor/isolation
refactor persistence for isolation
2 parents ef6a22d + 731e556 commit 04453c2

File tree

27 files changed

+129
-324
lines changed

27 files changed

+129
-324
lines changed

pkg/bbgo/environment.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (environ *Environment) ConfigurePersistence(conf *PersistenceConfig) error
283283
}
284284

285285
redisPersistence := service.NewRedisPersistenceService(conf.Redis)
286-
PersistenceServiceFacade.Redis = redisPersistence
286+
persistenceServiceFacade.Redis = redisPersistence
287287
}
288288

289289
if conf.Json != nil {
@@ -295,7 +295,7 @@ func (environ *Environment) ConfigurePersistence(conf *PersistenceConfig) error
295295
}
296296

297297
jsonPersistence := &service.JsonPersistenceService{Directory: conf.Json.Directory}
298-
PersistenceServiceFacade.Json = jsonPersistence
298+
persistenceServiceFacade.Json = jsonPersistence
299299
}
300300

301301
return nil
@@ -630,7 +630,7 @@ func (environ *Environment) ConfigureNotificationSystem(userConfig *Config) erro
630630
userConfig.Notifications = &NotificationConfig{}
631631
}
632632

633-
var persistence = PersistenceServiceFacade.Get()
633+
var persistence = persistenceServiceFacade.Get()
634634

635635
err := environ.setupInteraction(persistence)
636636
if err != nil {

pkg/bbgo/isolation.go

+17-7
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,24 @@ package bbgo
22

33
import (
44
"context"
5+
6+
"github.com/c9s/bbgo/pkg/service"
57
)
68

79
const IsolationContextKey = "bbgo"
810

9-
var defaultIsolation *Isolation = nil
10-
11-
func init() {
12-
defaultIsolation = NewIsolation()
13-
}
11+
var defaultIsolation = NewIsolation()
1412

1513
type Isolation struct {
16-
gracefulShutdown GracefulShutdown
14+
gracefulShutdown GracefulShutdown
15+
persistenceServiceFacade *service.PersistenceServiceFacade
1716
}
1817

1918
func NewIsolation() *Isolation {
20-
return &Isolation{}
19+
return &Isolation{
20+
gracefulShutdown: GracefulShutdown{},
21+
persistenceServiceFacade: DefaultPersistenceServiceFacade,
22+
}
2123
}
2224

2325
func NewIsolationFromContext(ctx context.Context) *Isolation {
@@ -28,3 +30,11 @@ func NewIsolationFromContext(ctx context.Context) *Isolation {
2830

2931
return defaultIsolation
3032
}
33+
34+
func NewContextWithIsolation(parent context.Context, isolation *Isolation) context.Context {
35+
return context.WithValue(parent, IsolationContextKey, isolation)
36+
}
37+
38+
func NewContextWithDefaultIsolation(parent context.Context) context.Context {
39+
return context.WithValue(parent, IsolationContextKey, defaultIsolation)
40+
}

pkg/bbgo/persistence.go

+4-79
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package bbgo
22

33
import (
4-
"fmt"
4+
"context"
55
"reflect"
66

77
log "github.com/sirupsen/logrus"
@@ -10,96 +10,21 @@ import (
1010
"github.com/c9s/bbgo/pkg/service"
1111
)
1212

13-
type PersistenceSelector struct {
14-
// StoreID is the store you want to use.
15-
StoreID string `json:"store" yaml:"store"`
16-
17-
// Type is the persistence type
18-
Type string `json:"type" yaml:"type"`
19-
}
20-
2113
var DefaultPersistenceServiceFacade = &service.PersistenceServiceFacade{
2214
Memory: service.NewMemoryService(),
2315
}
2416

25-
var PersistenceServiceFacade = DefaultPersistenceServiceFacade
26-
27-
// Persistence is used for strategy to inject the persistence.
28-
type Persistence struct {
29-
PersistenceSelector *PersistenceSelector `json:"persistence,omitempty" yaml:"persistence,omitempty"`
30-
}
31-
32-
func (p *Persistence) backendService(t string) (service.PersistenceService, error) {
33-
switch t {
34-
case "json":
35-
return PersistenceServiceFacade.Json, nil
36-
37-
case "redis":
38-
if PersistenceServiceFacade.Redis == nil {
39-
log.Warn("redis persistence is not available, fallback to memory backend")
40-
return PersistenceServiceFacade.Memory, nil
41-
}
42-
return PersistenceServiceFacade.Redis, nil
43-
44-
case "memory":
45-
return PersistenceServiceFacade.Memory, nil
46-
47-
}
48-
49-
return nil, fmt.Errorf("unsupported persistent type %s", t)
50-
}
51-
52-
func (p *Persistence) Load(val interface{}, subIDs ...string) error {
53-
ps, err := p.backendService(p.PersistenceSelector.Type)
54-
if err != nil {
55-
return err
56-
}
57-
58-
log.Debugf("using persistence store %T for loading", ps)
59-
60-
if p.PersistenceSelector.StoreID == "" {
61-
p.PersistenceSelector.StoreID = "default"
62-
}
63-
64-
store := ps.NewStore(p.PersistenceSelector.StoreID, subIDs...)
65-
return store.Load(val)
66-
}
67-
68-
func (p *Persistence) Save(val interface{}, subIDs ...string) error {
69-
ps, err := p.backendService(p.PersistenceSelector.Type)
70-
if err != nil {
71-
return err
72-
}
73-
74-
log.Debugf("using persistence store %T for storing", ps)
75-
76-
if p.PersistenceSelector.StoreID == "" {
77-
p.PersistenceSelector.StoreID = "default"
78-
}
79-
80-
store := ps.NewStore(p.PersistenceSelector.StoreID, subIDs...)
81-
return store.Save(val)
82-
}
83-
84-
func (p *Persistence) Sync(obj interface{}) error {
85-
id := dynamic.CallID(obj)
86-
if len(id) == 0 {
87-
return nil
88-
}
89-
90-
ps := PersistenceServiceFacade.Get()
91-
return storePersistenceFields(obj, id, ps)
92-
}
17+
var persistenceServiceFacade = DefaultPersistenceServiceFacade
9318

9419
// Sync syncs the object properties into the persistence layer
95-
func Sync(obj interface{}) {
20+
func Sync(ctx context.Context, obj interface{}) {
9621
id := dynamic.CallID(obj)
9722
if len(id) == 0 {
9823
log.Warnf("InstanceID() is not provided, can not sync persistence")
9924
return
10025
}
10126

102-
ps := PersistenceServiceFacade.Get()
27+
ps := persistenceServiceFacade.Get()
10328
err := storePersistenceFields(obj, id, ps)
10429
if err != nil {
10530
log.WithError(err).Errorf("persistence sync failed")

pkg/bbgo/trader.go

+6-16
Original file line numberDiff line numberDiff line change
@@ -376,11 +376,11 @@ func (trader *Trader) LoadState() error {
376376
return nil
377377
}
378378

379-
if PersistenceServiceFacade == nil {
379+
if persistenceServiceFacade == nil {
380380
return nil
381381
}
382382

383-
ps := PersistenceServiceFacade.Get()
383+
ps := persistenceServiceFacade.Get()
384384

385385
log.Infof("loading strategies states...")
386386

@@ -413,11 +413,11 @@ func (trader *Trader) SaveState() error {
413413
return nil
414414
}
415415

416-
if PersistenceServiceFacade == nil {
416+
if persistenceServiceFacade == nil {
417417
return nil
418418
}
419419

420-
ps := PersistenceServiceFacade.Get()
420+
ps := persistenceServiceFacade.Get()
421421

422422
log.Infof("saving strategies states...")
423423
return trader.IterateStrategies(func(strategy StrategyID) error {
@@ -434,16 +434,7 @@ func (trader *Trader) Shutdown(ctx context.Context) {
434434
trader.gracefulShutdown.Shutdown(ctx)
435435
}
436436

437-
var defaultPersistenceSelector = &PersistenceSelector{
438-
StoreID: "default",
439-
Type: "memory",
440-
}
441-
442437
func (trader *Trader) injectCommonServices(s interface{}) error {
443-
persistence := &Persistence{
444-
PersistenceSelector: defaultPersistenceSelector,
445-
}
446-
447438
// a special injection for persistence selector:
448439
// if user defined the selector, the facade pointer will be nil, hence we need to update the persistence facade pointer
449440
sv := reflect.ValueOf(s).Elem()
@@ -455,7 +446,7 @@ func (trader *Trader) injectCommonServices(s interface{}) error {
455446
return fmt.Errorf("field Persistence is not a struct element, %s given", field)
456447
}
457448

458-
if err := dynamic.InjectField(elem, "Facade", PersistenceServiceFacade, true); err != nil {
449+
if err := dynamic.InjectField(elem, "Facade", persistenceServiceFacade, true); err != nil {
459450
return err
460451
}
461452

@@ -475,7 +466,6 @@ func (trader *Trader) injectCommonServices(s interface{}) error {
475466
trader.environment.DatabaseService,
476467
trader.environment.AccountService,
477468
trader.environment,
478-
persistence,
479-
PersistenceServiceFacade, // if the strategy use persistence facade separately
469+
persistenceServiceFacade, // if the strategy use persistence facade separately
480470
)
481471
}

pkg/notifier/telegramnotifier/logrus_look.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (t *LogHook) Fire(e *logrus.Entry) error {
3434
}
3535

3636
var message = fmt.Sprintf("[%s] %s", e.Level.String(), e.Message)
37-
if errData, ok := e.Data[logrus.ErrorKey]; ok {
37+
if errData, ok := e.Data[logrus.ErrorKey]; ok && errData != nil {
3838
if err, isErr := errData.(error); isErr {
3939
message += " Error: " + err.Error()
4040
}

pkg/strategy/audacitymaker/strategy.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
111111
s.orderExecutor.BindProfitStats(s.ProfitStats)
112112
s.orderExecutor.BindTradeStats(s.TradeStats)
113113
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
114-
bbgo.Sync(s)
114+
bbgo.Sync(ctx, s)
115115
})
116116
s.orderExecutor.Bind()
117117
s.activeOrders = bbgo.NewActiveOrderBook(s.Symbol)

pkg/strategy/bollmaker/strategy.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
516516
s.orderExecutor.BindProfitStats(s.ProfitStats)
517517
s.orderExecutor.Bind()
518518
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
519-
bbgo.Sync(s)
519+
bbgo.Sync(ctx, s)
520520
})
521521
s.ExitMethods.Bind(session, s.orderExecutor)
522522

@@ -531,7 +531,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
531531

532532
s.OnSuspend(func() {
533533
_ = s.orderExecutor.GracefulCancel(ctx)
534-
bbgo.Sync(s)
534+
bbgo.Sync(ctx, s)
535535
})
536536

537537
s.OnEmergencyStop(func() {

pkg/strategy/dca/strategy.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ func (b BudgetPeriod) Duration() time.Duration {
4747

4848
// Strategy is the Dollar-Cost-Average strategy
4949
type Strategy struct {
50-
5150
Environment *bbgo.Environment
5251
Symbol string `json:"symbol"`
5352
Market types.Market
@@ -110,7 +109,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
110109
s.orderExecutor.BindEnvironment(s.Environment)
111110
s.orderExecutor.BindProfitStats(s.ProfitStats)
112111
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
113-
bbgo.Sync(s)
112+
bbgo.Sync(ctx, s)
114113
})
115114
s.orderExecutor.Bind()
116115

pkg/strategy/drift/strategy.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -795,7 +795,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
795795
s.GeneralOrderExecutor.BindProfitStats(s.ProfitStats)
796796
s.GeneralOrderExecutor.BindTradeStats(s.TradeStats)
797797
s.GeneralOrderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
798-
bbgo.Sync(s)
798+
bbgo.Sync(ctx, s)
799799
})
800800
s.GeneralOrderExecutor.Bind()
801801

pkg/strategy/elliottwave/strategy.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
310310
s.GeneralOrderExecutor.BindProfitStats(s.ProfitStats)
311311
s.GeneralOrderExecutor.BindTradeStats(s.TradeStats)
312312
s.GeneralOrderExecutor.TradeCollector().OnPositionUpdate(func(p *types.Position) {
313-
bbgo.Sync(s)
313+
bbgo.Sync(ctx, s)
314314
})
315315
s.GeneralOrderExecutor.Bind()
316316

pkg/strategy/ewoDgtrd/strategy.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
708708
s.orderExecutor.BindProfitStats(s.ProfitStats)
709709
// s.orderExecutor.BindTradeStats(s.TradeStats)
710710
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
711-
bbgo.Sync(s)
711+
bbgo.Sync(ctx, s)
712712
})
713713
s.orderExecutor.Bind()
714714

pkg/strategy/factorzoo/strategy.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
109109
s.orderExecutor.BindProfitStats(s.ProfitStats)
110110
s.orderExecutor.BindTradeStats(s.TradeStats)
111111
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
112-
bbgo.Sync(s)
112+
bbgo.Sync(ctx, s)
113113
})
114114
s.orderExecutor.Bind()
115115
s.activeOrders = bbgo.NewActiveOrderBook(s.Symbol)

pkg/strategy/fmaker/strategy.go

-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ type IntervalWindowSetting struct {
3232
}
3333

3434
type Strategy struct {
35-
*bbgo.Persistence
36-
3735
Environment *bbgo.Environment
3836
Symbol string `json:"symbol"`
3937
Market types.Market

0 commit comments

Comments
 (0)