Skip to content

Commit 4833d56

Browse files
authored
Merge pull request #442 from pikers/misc_brokerd_backend_repairs
Misc brokerd backend repairs
2 parents 8d1eb81 + 090d1ba commit 4833d56

File tree

12 files changed

+159
-99
lines changed

12 files changed

+159
-99
lines changed

.github/workflows/ci.yml

+1-2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@ name: CI
33

44
on:
55
# Triggers the workflow on push or pull request events but only for the master branch
6-
push:
7-
branches: [ master ]
86
pull_request:
7+
push:
98
branches: [ master ]
109

1110
# Allows you to run this workflow manually from the Actions tab

dockering/ib/docker-compose.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ services:
88
# https://github.com/waytrade/ib-gateway-docker#supported-tags
99
# image: waytrade/ib-gateway:981.3j
1010
image: waytrade/ib-gateway:1012.2i
11-
restart: always # restart whenev there's a crash or user clicsk
11+
restart: 'no' # restart on boot whenev there's a crash or user clicsk
1212
network_mode: 'host'
1313

1414
volumes:
@@ -64,7 +64,7 @@ services:
6464

6565
# ib_gw_live:
6666
# image: waytrade/ib-gateway:1012.2i
67-
# restart: always
67+
# restart: no
6868
# network_mode: 'host'
6969

7070
# volumes:

piker/brokers/binance.py

+9-8
Original file line numberDiff line numberDiff line change
@@ -519,14 +519,15 @@ async def subscribe(ws: wsproto.WSConnection):
519519
subs.append("{sym}@bookTicker")
520520

521521
# unsub from all pairs on teardown
522-
await ws.send_msg({
523-
"method": "UNSUBSCRIBE",
524-
"params": subs,
525-
"id": uid,
526-
})
527-
528-
# XXX: do we need to ack the unsub?
529-
# await ws.recv_msg()
522+
if ws.connected():
523+
await ws.send_msg({
524+
"method": "UNSUBSCRIBE",
525+
"params": subs,
526+
"id": uid,
527+
})
528+
529+
# XXX: do we need to ack the unsub?
530+
# await ws.recv_msg()
530531

531532
async with open_autorecon_ws(
532533
'wss://stream.binance.com/ws',

piker/brokers/deribit/feed.py

-15
Original file line numberDiff line numberDiff line change
@@ -94,21 +94,6 @@ async def get_ohlc(
9494
yield get_ohlc, {'erlangs': 3, 'rate': 3}
9595

9696

97-
async def backfill_bars(
98-
symbol: str,
99-
shm: ShmArray, # type: ignore # noqa
100-
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
101-
) -> None:
102-
"""Fill historical bars into shared mem / storage afap.
103-
"""
104-
instrument = symbol
105-
with trio.CancelScope() as cs:
106-
async with open_cached_client('deribit') as client:
107-
bars = await client.bars(instrument)
108-
shm.push(bars)
109-
task_status.started(cs)
110-
111-
11297
async def stream_quotes(
11398

11499
send_chan: trio.abc.SendChannel,

piker/brokers/ib/api.py

+16
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ def __init__(self):
162162
'CMECRYPTO',
163163
'COMEX',
164164
'CMDTY', # special name case..
165+
'CBOT', # (treasury) yield futures
165166
)
166167

167168
_adhoc_futes_set = {
@@ -197,6 +198,21 @@ def __init__(self):
197198
'xagusd.cmdty', # silver spot
198199
'ni.comex', # silver futes
199200
'qi.comex', # mini-silver futes
201+
202+
# treasury yields
203+
# etfs by duration:
204+
# SHY -> IEI -> IEF -> TLT
205+
'zt.cbot', # 2y
206+
'z3n.cbot', # 3y
207+
'zf.cbot', # 5y
208+
'zn.cbot', # 10y
209+
'zb.cbot', # 30y
210+
211+
# (micros of above)
212+
'2yy.cbot',
213+
'5yy.cbot',
214+
'10y.cbot',
215+
'30y.cbot',
200216
}
201217

202218

piker/brokers/ib/broker.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ async def trades_dialogue(
611611
pp = table.pps[bsuid]
612612
if msg.size != pp.size:
613613
log.error(
614-
'Position mismatch {pp.symbol.front_fqsn()}:\n'
614+
f'Position mismatch {pp.symbol.front_fqsn()}:\n'
615615
f'ib: {msg.size}\n'
616616
f'piker: {pp.size}\n'
617617
)

piker/brokers/ib/feed.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,10 @@ async def open_history_client(
135135
# fx cons seem to not provide this endpoint?
136136
'idealpro' not in fqsn
137137
):
138-
head_dt = await proxy.get_head_time(fqsn=fqsn)
138+
try:
139+
head_dt = await proxy.get_head_time(fqsn=fqsn)
140+
except RequestError:
141+
head_dt = None
139142

140143
async def get_hist(
141144
timeframe: float,

piker/brokers/kraken/api.py

-4
Original file line numberDiff line numberDiff line change
@@ -510,10 +510,6 @@ def normalize_symbol(
510510
511511
'''
512512
ticker = cls._ntable[ticker]
513-
symlen = len(ticker)
514-
if symlen != 6:
515-
raise ValueError(f'Unhandled symbol: {ticker}')
516-
517513
return ticker.lower()
518514

519515

piker/brokers/kraken/broker.py

+76-26
Original file line numberDiff line numberDiff line change
@@ -413,19 +413,26 @@ async def trades_dialogue(
413413

414414
) -> AsyncIterator[dict[str, Any]]:
415415

416-
# XXX: required to propagate ``tractor`` loglevel to piker logging
416+
# XXX: required to propagate ``tractor`` loglevel to ``piker`` logging
417417
get_console_log(loglevel or tractor.current_actor().loglevel)
418418

419419
async with get_client() as client:
420420

421+
if not client._api_key:
422+
raise RuntimeError(
423+
'Missing Kraken API key in `brokers.toml`!?!?')
424+
421425
# TODO: make ems flip to paper mode via
422426
# some returned signal if the user only wants to use
423427
# the data feed or we return this?
424-
# await ctx.started(({}, ['paper']))
428+
# else:
429+
# await ctx.started(({}, ['paper']))
425430

426-
if not client._api_key:
427-
raise RuntimeError(
428-
'Missing Kraken API key in `brokers.toml`!?!?')
431+
# NOTE: currently we expect the user to define a "source fiat"
432+
# (much like the web UI let's you set an "account currency")
433+
# such that all positions (nested or flat) will be translated to
434+
# this source currency's terms.
435+
src_fiat = client.conf['src_fiat']
429436

430437
# auth required block
431438
acctid = client._name
@@ -444,10 +451,9 @@ async def trades_dialogue(
444451
# NOTE: testing code for making sure the rt incremental update
445452
# of positions, via newly generated msgs works. In order to test
446453
# this,
447-
# - delete the *ABSOLUTE LAST* entry from accont's corresponding
454+
# - delete the *ABSOLUTE LAST* entry from account's corresponding
448455
# trade ledgers file (NOTE this MUST be the last record
449-
# delivered from the
450-
# api ledger),
456+
# delivered from the api ledger),
451457
# - open you ``pps.toml`` and find that same tid and delete it
452458
# from the pp's clears table,
453459
# - set this flag to `True`
@@ -486,40 +492,84 @@ async def trades_dialogue(
486492
# and do diff with ledger to determine
487493
# what amount of trades-transactions need
488494
# to be reloaded.
489-
sizes = await client.get_balances()
490-
for dst, size in sizes.items():
495+
balances = await client.get_balances()
496+
for dst, size in balances.items():
491497
# we don't care about tracking positions
492498
# in the user's source fiat currency.
493-
if dst == client.conf['src_fiat']:
499+
if dst == src_fiat:
494500
continue
495501

496-
def has_pp(dst: str) -> Position | bool:
497-
pps_dst_assets = {bsuid[:3]: bsuid for bsuid in table.pps}
498-
pair = pps_dst_assets.get(dst)
499-
pp = table.pps.get(pair)
502+
def has_pp(
503+
dst: str,
504+
size: float,
500505

501-
if (
502-
not pair or not pp
503-
or not math.isclose(pp.size, size)
504-
):
505-
return False
506+
) -> Position | bool:
506507

507-
return pp
508+
src2dst: dict[str, str] = {}
509+
for bsuid in table.pps:
510+
try:
511+
dst_name_start = bsuid.rindex(src_fiat)
512+
except (
513+
ValueError, # substr not found
514+
):
515+
# TODO: handle nested positions..(i.e.
516+
# positions where the src fiat was used to
517+
# buy some other dst which was furhter used
518+
# to buy another dst..)
519+
log.warning(
520+
f'No src fiat {src_fiat} found in {bsuid}?'
521+
)
522+
continue
508523

509-
pos = has_pp(dst)
524+
_dst = bsuid[:dst_name_start]
525+
if _dst != dst:
526+
continue
527+
528+
src2dst[src_fiat] = dst
529+
530+
for src, dst in src2dst.items():
531+
pair = f'{dst}{src_fiat}'
532+
pp = table.pps.get(pair)
533+
if (
534+
pp
535+
and math.isclose(pp.size, size)
536+
):
537+
return pp
538+
539+
elif (
540+
size == 0
541+
and pp.size
542+
):
543+
log.warning(
544+
f'`kraken` account says you have a ZERO '
545+
f'balance for {bsuid}:{pair}\n'
546+
f'but piker seems to think `{pp.size}`\n'
547+
'This is likely a discrepancy in piker '
548+
'accounting if the above number is'
549+
"large,' though it's likely to due lack"
550+
"f tracking xfers fees.."
551+
)
552+
return pp
553+
554+
return False
555+
556+
pos = has_pp(dst, size)
510557
if not pos:
511558

512559
# we have a balance for which there is no pp
513560
# entry? so we have to likely update from the
514561
# ledger.
515562
updated = table.update_from_trans(ledger_trans)
516563
log.info(f'Updated pps from ledger:\n{pformat(updated)}')
517-
pos = has_pp(dst)
564+
pos = has_pp(dst, size)
518565

519-
if not pos and not simulate_pp_update:
566+
if (
567+
not pos
568+
and not simulate_pp_update
569+
):
520570
# try reloading from API
521571
table.update_from_trans(api_trans)
522-
pos = has_pp(dst)
572+
pos = has_pp(dst, size)
523573
if not pos:
524574

525575
# get transfers to make sense of abs balances.
@@ -557,7 +607,7 @@ def has_pp(dst: str) -> Position | bool:
557607
f'{pformat(updated)}'
558608
)
559609

560-
if not has_pp(dst):
610+
if has_pp(dst, size):
561611
raise ValueError(
562612
'Could not reproduce balance:\n'
563613
f'dst: {dst}, {size}\n'

piker/brokers/kraken/feed.py

+9-26
Original file line numberDiff line numberDiff line change
@@ -303,24 +303,6 @@ async def get_ohlc(
303303
yield get_ohlc, {'erlangs': 1, 'rate': 1}
304304

305305

306-
async def backfill_bars(
307-
308-
sym: str,
309-
shm: ShmArray, # type: ignore # noqa
310-
count: int = 10, # NOTE: any more and we'll overrun the underlying buffer
311-
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
312-
313-
) -> None:
314-
'''
315-
Fill historical bars into shared mem / storage afap.
316-
'''
317-
with trio.CancelScope() as cs:
318-
async with open_cached_client('kraken') as client:
319-
bars = await client.bars(symbol=sym)
320-
shm.push(bars)
321-
task_status.started(cs)
322-
323-
324306
async def stream_quotes(
325307

326308
send_chan: trio.abc.SendChannel,
@@ -419,14 +401,15 @@ async def subscribe(ws: NoBsWs):
419401
yield
420402

421403
# unsub from all pairs on teardown
422-
await ws.send_msg({
423-
'pair': list(ws_pairs.values()),
424-
'event': 'unsubscribe',
425-
'subscription': ['ohlc', 'spread'],
426-
})
427-
428-
# XXX: do we need to ack the unsub?
429-
# await ws.recv_msg()
404+
if ws.connected():
405+
await ws.send_msg({
406+
'pair': list(ws_pairs.values()),
407+
'event': 'unsubscribe',
408+
'subscription': ['ohlc', 'spread'],
409+
})
410+
411+
# XXX: do we need to ack the unsub?
412+
# await ws.recv_msg()
430413

431414
# see the tips on reconnection logic:
432415
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds

0 commit comments

Comments
 (0)