Skip to content

Commit

Permalink
Event log deltas.
Browse files Browse the repository at this point in the history
  • Loading branch information
martonp committed Jul 3, 2024
1 parent cb83279 commit 4db0e05
Show file tree
Hide file tree
Showing 7 changed files with 950 additions and 433 deletions.
141 changes: 108 additions & 33 deletions client/mm/event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,10 @@ type WithdrawalEvent struct {

// MarketMakingEvent represents an action that a market making bot takes.
type MarketMakingEvent struct {
ID uint64 `json:"id"`
TimeStamp int64 `json:"timestamp"`
BaseDelta int64 `json:"baseDelta"`
QuoteDelta int64 `json:"quoteDelta"`
BaseFees uint64 `json:"baseFees"`
QuoteFees uint64 `json:"quoteFees"`
Pending bool `json:"pending"`
ID uint64 `json:"id"`
TimeStamp int64 `json:"timestamp"`
Pending bool `json:"pending"`
BalanceEffects *BalanceEffects `json:"balanceEffects,omitempty"`

// Only one of the following will be populated.
DEXOrderEvent *DEXOrderEvent `json:"dexOrderEvent,omitempty"`
Expand Down Expand Up @@ -194,6 +191,56 @@ func newBoltEventLogDB(ctx context.Context, path string, log dex.Logger) (*boltE
return eventLogDB, nil
}

func calcFinalStateBasedOnEventDiff(runBucket, eventsBucket *bbolt.Bucket, eventKey []byte, newEvent *MarketMakingEvent) (*BalanceState, error) {
finalStateB := runBucket.Get(finalStateKey)
if finalStateB == nil {
return nil, fmt.Errorf("no final state found")
}

finalState, err := decodeFinalState(finalStateB)
if err != nil {
return nil, err
}

originalEventB := eventsBucket.Get(eventKey)
if originalEventB == nil {
return nil, fmt.Errorf("no original event found")
}

originalEvent, err := decodeMarketMakingEvent(originalEventB)
if err != nil {
return nil, err
}

applyDiff := func(curr uint64, diff int64) uint64 {
if diff > 0 {
return curr + uint64(diff)
}

if curr < uint64(-diff) {
return 0
}

return curr + uint64(diff)
}

balanceEffectDiff := newEvent.BalanceEffects.sub(originalEvent.BalanceEffects)
for assetID, diff := range balanceEffectDiff.settled {
finalState.Balances[assetID].Available = applyDiff(finalState.Balances[assetID].Available, diff)
}
for assetID, diff := range balanceEffectDiff.pending {
finalState.Balances[assetID].Pending = applyDiff(finalState.Balances[assetID].Pending, diff)
}
for assetID, diff := range balanceEffectDiff.locked {
finalState.Balances[assetID].Locked = applyDiff(finalState.Balances[assetID].Locked, diff)
}
for assetID, diff := range balanceEffectDiff.reserved {
finalState.Balances[assetID].Reserved = applyDiff(finalState.Balances[assetID].Reserved, diff)
}

return finalState, nil
}

// updateEvent is called for each event that is popped off the updateEvent. If
// the event already exists, it is updated. If it does not exist, it is added.
// The stats for the run are also updated based on the event.
Expand All @@ -204,6 +251,7 @@ func (db *boltEventLogDB) updateEvent(update *eventUpdate) {
if runBucket == nil {
return fmt.Errorf("nil run bucket for key %x", update.runKey)
}

eventsBkt, err := runBucket.CreateBucketIfNotExists(eventsBucket)
if err != nil {
return err
Expand All @@ -215,6 +263,14 @@ func (db *boltEventLogDB) updateEvent(update *eventUpdate) {
}
eventB := versionedBytes(0).AddData(eventJSON)

bs := update.bs
if bs == nil {
bs, err = calcFinalStateBasedOnEventDiff(runBucket, eventsBkt, eventKey, update.e)
if err != nil {
return err
}
}

if err := eventsBkt.Put(eventKey, eventB); err != nil {
return err
}
Expand All @@ -226,7 +282,7 @@ func (db *boltEventLogDB) updateEvent(update *eventUpdate) {
}

// Update the final state.
bsJSON, err := json.Marshal(update.bs)
bsJSON, err := json.Marshal(bs)
if err != nil {
return err
}
Expand Down Expand Up @@ -429,6 +485,25 @@ func (db *boltEventLogDB) runs(n uint64, refStartTime *uint64, refMkt *MarketWit
return runs, nil
}

func decodeFinalState(finalStateB []byte) (*BalanceState, error) {
finalState := new(BalanceState)
ver, pushes, err := encode.DecodeBlob(finalStateB)
if err != nil {
return nil, err
}
if ver != 0 {
return nil, fmt.Errorf("unknown final state version %d", ver)
}
if len(pushes) != 1 {
return nil, fmt.Errorf("expected 1 push for final state, got %d", len(pushes))
}
err = json.Unmarshal(pushes[0], finalState)
if err != nil {
return nil, err
}
return finalState, nil
}

// runOverview returns overview information about a run, not including the
// events that took place.
func (db *boltEventLogDB) runOverview(startTime int64, mkt *MarketWithHost) (*MarketMakingRunOverview, error) {
Expand Down Expand Up @@ -459,18 +534,11 @@ func (db *boltEventLogDB) runOverview(startTime int64, mkt *MarketWithHost) (*Ma
}

finalStateB := runBucket.Get(finalStateKey)
finalState := new(BalanceState)
ver, pushes, err = encode.DecodeBlob(finalStateB)
if err != nil {
return err
}
if ver != 0 {
return fmt.Errorf("unknown final state version %d", ver)
if finalStateB == nil {
return fmt.Errorf("no final state found")
}
if len(pushes) != 1 {
return fmt.Errorf("expected 1 push for final state, got %d", len(pushes))
}
err = json.Unmarshal(pushes[0], finalState)

finalState, err := decodeFinalState(finalStateB)
if err != nil {
return err
}
Expand Down Expand Up @@ -527,6 +595,25 @@ func (db *boltEventLogDB) storeEvent(startTime int64, mkt *MarketWithHost, e *Ma
}
}

func decodeMarketMakingEvent(eventB []byte) (*MarketMakingEvent, error) {
e := new(MarketMakingEvent)
ver, pushes, err := encode.DecodeBlob(eventB)
if err != nil {
return nil, err
}
if ver != 0 {
return nil, fmt.Errorf("unknown version %d", ver)
}
if len(pushes) != 1 {
return nil, fmt.Errorf("expected 1 push for event, got %d", len(pushes))
}
err = json.Unmarshal(pushes[0], e)
if err != nil {
return nil, err
}
return e, nil
}

// runEvents returns the events that took place during a run. If n == 0, all of the
// events will be returned. If refID is not nil, the events including and after the
// event with the ID will be returned. If pendingOnly is true, only pending events
Expand Down Expand Up @@ -555,19 +642,7 @@ func (db *boltEventLogDB) runEvents(startTime int64, mkt *MarketWithHost, n uint
}

for ; k != nil; k, v = cursor.Prev() {
ver, pushes, err := encode.DecodeBlob(v)
if err != nil {
return err
}
if ver != 0 {
return fmt.Errorf("unknown version %d", ver)
}
if len(pushes) != 1 {
return fmt.Errorf("expected 1 push for event, got %d", len(pushes))
}

var e MarketMakingEvent
err = json.Unmarshal(pushes[0], &e)
e, err := decodeMarketMakingEvent(v)
if err != nil {
return err
}
Expand All @@ -576,7 +651,7 @@ func (db *boltEventLogDB) runEvents(startTime int64, mkt *MarketWithHost, n uint
continue
}

events = append(events, &e)
events = append(events, e)
if n > 0 && uint64(len(events)) >= n {
break
}
Expand Down
48 changes: 28 additions & 20 deletions client/mm/event_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,17 @@ func TestEventLogDB(t *testing.T) {
}

event1 := &MarketMakingEvent{
ID: 1,
TimeStamp: startTime + 1,
BaseDelta: 1e6,
QuoteDelta: -2e6,
BaseFees: 200,
QuoteFees: 100,
Pending: true,
ID: 1,
TimeStamp: startTime + 1,
BalanceEffects: &BalanceEffects{
Settled: map[uint32]int64{
42: 1e6,
},
Locked: map[uint32]uint64{
60: 2e6,
},
},
Pending: true,
DEXOrderEvent: &DEXOrderEvent{
ID: "order1",
Rate: 5e7,
Expand All @@ -147,17 +151,23 @@ func TestEventLogDB(t *testing.T) {
},
}

currBals[42].Available += uint64(event1.BaseDelta) - event1.BaseFees
currBals[42].Available += 2e6
currBals[42].Pending += 1e6
currBals[60].Available += uint64(event1.QuoteDelta) - event1.QuoteFees
currBals[60].Available += 8e6
db.storeEvent(startTime, mkt, event1, currBalanceState())

event2 := &MarketMakingEvent{
ID: 2,
TimeStamp: startTime + 1,
BaseDelta: 1e6,
QuoteDelta: -2e6,
Pending: true,
ID: 2,
TimeStamp: startTime + 1,
BalanceEffects: &BalanceEffects{
Settled: map[uint32]int64{
42: 3e6,
},
Locked: map[uint32]uint64{
60: 4e6,
},
},
Pending: true,
CEXOrderEvent: &CEXOrderEvent{
ID: "order1",
Rate: 5e7,
Expand All @@ -167,8 +177,8 @@ func TestEventLogDB(t *testing.T) {
QuoteFilled: 2e6,
},
}
currBals[42].Available += uint64(event2.BaseDelta)
currBals[60].Available += uint64(event2.QuoteDelta)
currBals[42].Available += 3e6
currBals[60].Available += 4e6
currBals[42].Pending += 1e6
db.storeEvent(startTime, mkt, event2, currBalanceState())

Expand Down Expand Up @@ -240,10 +250,8 @@ func TestEventLogDB(t *testing.T) {
}

// Update event1 and fiat rates
event1.BaseDelta += 100
event1.QuoteDelta -= 200
event1.BaseFees += 20
event1.QuoteFees += 10
event1.BalanceEffects.Settled[42] += 100
event1.BalanceEffects.Locked[60] -= 100
event1.Pending = false
currBals[42].Available += 100 - 20
currBals[60].Available -= 200 + 10
Expand Down
Loading

0 comments on commit 4db0e05

Please sign in to comment.