Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: [core] add syncBufferPeriod config and set default to -30 mins #1659

Merged
merged 1 commit into from
Jun 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/bbgo/config.go
Original file line number Diff line number Diff line change
@@ -346,6 +346,8 @@ type EnvironmentConfig struct {
DisableMarketDataStore bool `json:"disableMarketDataStore"`

MaxSessionTradeBufferSize int `json:"maxSessionTradeBufferSize"`

SyncBufferPeriod *types.Duration `json:"syncBufferPeriod"`
}

type Config struct {
14 changes: 13 additions & 1 deletion pkg/bbgo/environment.go
Original file line number Diff line number Diff line change
@@ -37,6 +37,8 @@ func init() {
rand.Seed(time.Now().UnixNano())
}

var defaultSyncBufferPeriod = 30 * time.Minute

// IsBackTesting is a global variable that indicates the current environment is back-test or not.
var IsBackTesting = false

@@ -645,7 +647,17 @@ func (environ *Environment) syncSession(

log.Infof("syncing symbols %v from session %s", symbols, session.Name)

return environ.SyncService.SyncSessionSymbols(ctx, session.Exchange, syncStartTime, symbols...)
syncBufferPeriod := -defaultSyncBufferPeriod
if environ.environmentConfig.SyncBufferPeriod != nil {
syncBufferPeriod = -environ.environmentConfig.SyncBufferPeriod.Duration()
}

if syncBufferPeriod > 0 {
log.Warnf("syncBufferPeriod should be a negative number, given: %d", syncBufferPeriod)
}

syncEndTime := time.Now().Add(syncBufferPeriod)
return environ.SyncService.SyncSessionSymbols(ctx, session.Exchange, syncStartTime, syncEndTime, symbols...)
}

func (environ *Environment) ConfigureNotificationSystem(ctx context.Context, userConfig *Config) error {
7 changes: 5 additions & 2 deletions pkg/service/order.go
Original file line number Diff line number Diff line change
@@ -19,7 +19,10 @@ type OrderService struct {
DB *sqlx.DB
}

func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
func (s *OrderService) Sync(
ctx context.Context, exchange types.Exchange, symbol string,
startTime, endTime time.Time,
) error {
isMargin, isFutures, isIsolated, isolatedSymbol := exchange2.GetSessionAttributes(exchange)
// override symbol if isolatedSymbol is not empty
if isIsolated && len(isolatedSymbol) > 0 {
@@ -77,7 +80,7 @@ func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol
}

for _, sel := range tasks {
if err := sel.execute(ctx, s.DB, startTime); err != nil {
if err := sel.execute(ctx, s.DB, startTime, endTime); err != nil {
return err
}
}
8 changes: 5 additions & 3 deletions pkg/service/sync.go
Original file line number Diff line number Diff line change
@@ -27,7 +27,9 @@ type SyncService struct {

// SyncSessionSymbols syncs the trades from the given exchange session
func (s *SyncService) SyncSessionSymbols(
ctx context.Context, exchange types.Exchange, startTime time.Time, symbols ...string,
ctx context.Context, exchange types.Exchange,
startTime, endTime time.Time,
symbols ...string,
) error {
markets, err := cache.LoadExchangeMarketsWithCache(ctx, exchange)
if err != nil {
@@ -41,12 +43,12 @@ func (s *SyncService) SyncSessionSymbols(
}

log.Infof("syncing %s %s trades from %s...", exchange.Name(), symbol, startTime)
if err := s.TradeService.Sync(ctx, exchange, symbol, startTime); err != nil {
if err := s.TradeService.Sync(ctx, exchange, symbol, startTime, endTime); err != nil {
return err
}

log.Infof("syncing %s %s orders from %s...", exchange.Name(), symbol, startTime)
if err := s.OrderService.Sync(ctx, exchange, symbol, startTime); err != nil {
if err := s.OrderService.Sync(ctx, exchange, symbol, startTime, endTime); err != nil {
return err
}
}
9 changes: 6 additions & 3 deletions pkg/service/sync_task.go
Original file line number Diff line number Diff line change
@@ -54,7 +54,10 @@ type SyncTask struct {
LogInsert bool
}

func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Time, args ...time.Time) error {
func (sel SyncTask) execute(
ctx context.Context,
db *sqlx.DB, startTime time.Time, endTimeArgs ...time.Time,
) error {
batchBufferRefVal := reflect.MakeSlice(reflect.SliceOf(reflect.TypeOf(sel.Type)), 0, sel.BatchInsertBuffer)

// query from db
@@ -84,8 +87,8 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim
startTime = lastRecordTime(sel, recordSliceRef, startTime)

endTime := time.Now()
if len(args) > 0 {
endTime = args[0]
if len(endTimeArgs) > 0 {
endTime = endTimeArgs[0]
}

// asset "" means all assets
8 changes: 6 additions & 2 deletions pkg/service/trade.go
Original file line number Diff line number Diff line change
@@ -58,7 +58,11 @@ func NewTradeService(db *sqlx.DB) *TradeService {
return &TradeService{db}
}

func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
func (s *TradeService) Sync(
ctx context.Context,
exchange types.Exchange, symbol string,
startTime, endTime time.Time,
) error {
isMargin, isFutures, isIsolated, isolatedSymbol := exchange2.GetSessionAttributes(exchange)
// override symbol if isolatedSymbol is not empty
if isIsolated && len(isolatedSymbol) > 0 {
@@ -106,7 +110,7 @@ func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol
}

for _, sel := range tasks {
if err := sel.execute(ctx, s.DB, startTime); err != nil {
if err := sel.execute(ctx, s.DB, startTime, endTime); err != nil {
return err
}
}

Unchanged files with check annotations Beta

package main

Check warning on line 1 in cmd/update-doc/main.go

GitHub Actions / build (6.2, 1.21)

should have a package comment
import (
"fmt"
package main

Check warning on line 1 in examples/max-rewards/main.go

GitHub Actions / build (6.2, 1.21)

should have a package comment
import (
"context"
package glassnode

Check warning on line 1 in pkg/datasource/glassnode/datasource.go

GitHub Actions / build (6.2, 1.21)

should have a package comment
import (
"context"
package main

Check warning on line 1 in cmd/bbgo-webview/main.go

GitHub Actions / build (6.2, 1.21)

should have a package comment
import (
"context"
"time"
"github.com/joho/godotenv"
"github.com/webview/webview"

Check failure on line 14 in cmd/bbgo-webview/main.go

GitHub Actions / lint

could not import github.com/webview/webview (-: go build github.com/webview/webview:
log "github.com/sirupsen/logrus"
package main

Check warning on line 1 in cmd/bbgo-lorca/main.go

GitHub Actions / build (6.2, 1.21)

should have a package comment
import (
"context"
package glassnodeapi

Check warning on line 1 in pkg/datasource/glassnode/glassnodeapi/client.go

GitHub Actions / build (6.2, 1.21)

should have a package comment
import (
"context"
package main

Check warning on line 1 in cmd/bbgo/main.go

GitHub Actions / build (6.2, 1.21)

should have a package comment
import (
"github.com/c9s/bbgo/pkg/cmd"
package datatype

Check warning on line 1 in pkg/datatype/string_slice.go

GitHub Actions / build (6.2, 1.21)

should have a package comment
import (
"encoding/json"
package main

Check warning on line 1 in examples/max-withdraw/main.go

GitHub Actions / build (6.2, 1.21)

should have a package comment
import (
"context"
)
// WebsocketClientBase is a legacy base client
// Deprecated: please use standard stream instead.

Check failure on line 12 in pkg/net/websocketbase/client.go

GitHub Actions / lint

File is not `gofmt`-ed with `-s` (gofmt)
//go:generate callbackgen -type WebsocketClientBase
type WebsocketClientBase struct {
baseURL string
func UnixMilli() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}

Check failure on line 16 in pkg/util/time.go

GitHub Actions / lint

File is not `gofmt`-ed with `-s` (gofmt)
func TestMergeMigrationsMap(t *testing.T) {
MergeMigrationsMap(map[rockhopper.RegistryKey]*rockhopper.Migration{
rockhopper.RegistryKey{Version: 2}: &rockhopper.Migration{},

Check failure on line 18 in pkg/migrations/sqlite3/migration_api_test.go

GitHub Actions / lint

File is not `gofmt`-ed with `-s` (gofmt)
rockhopper.RegistryKey{Version: 2}: &rockhopper.Migration{},
})
}