Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Questdb backend does not work with multiprocessing option #1041

Open
mirageAlchemy opened this issue Jun 8, 2024 · 2 comments
Open

Questdb backend does not work with multiprocessing option #1041

mirageAlchemy opened this issue Jun 8, 2024 · 2 comments
Labels

Comments

@mirageAlchemy
Copy link

Describe the bug
When config={"backend_multiprocessing": True} in feed definition, questdb backend will crash by raising an exception

To Reproduce
Steps to reproduce the behavior:

  • Provide small code sample that reproduces the issue
  • Provide any tracebacks, if applicable
"""
Copyright (C) 2017-2024 Bryant Moscon - [email protected]

Please see the LICENSE file for the terms and conditions
associated with this software.
"""

from cryptofeed import FeedHandler
from cryptofeed.defines import L2_BOOK, TICKER, TRADES, OPEN_INTEREST, L3_BOOK
from cryptofeed.exchanges import Coinbase, BinanceFutures, Binance, Kraken, KrakenFutures
from cryptofeed.raw_data_collection import AsyncFileCallback

from cryptofeed.backends.quest import BookQuest, OpenInterestQuest, TradeQuest

QUEST_HOST = "127.0.0.1"
QUEST_PORT = 9009


def main():
    config = {"log": {"filename": "redis-demo.log", "level": "INFO"}, "backend_multiprocessing": True}
    f = FeedHandler(config=config)
    f.add_feed(
        Binance(
            max_depth=10,
            symbols=["BTC-USDT"],
            channels=[L2_BOOK, TRADES],
            callbacks={
                L2_BOOK: BookQuest(host=QUEST_HOST, port=QUEST_PORT),
                TRADES: TradeQuest(host=QUEST_HOST, port=QUEST_PORT),
            },
            config={"backend_multiprocessing": True},
        )
    )

Traceback:

  File "/home/ubuntu/miniforge3/lib/python3.10/site-packages/cryptofeed/connection_handler.py", line 69, in _create_connection
    await self._handler(connection, self.handler)
  File "/home/ubuntu/miniforge3/lib/python3.10/site-packages/cryptofeed/connection_handler.py", line 119, in _handler
    await handler(message, connection, self.conn.last_message)
  File "/home/ubuntu/miniforge3/lib/python3.10/site-packages/cryptofeed/exchanges/binance.py", line 518, in message_handler
    await self._trade(msg, timestamp)
  File "/home/ubuntu/miniforge3/lib/python3.10/site-packages/cryptofeed/exchanges/binance.py", line 195, in _trade
    await self.callback(TRADES, t, timestamp)
  File "/home/ubuntu/miniforge3/lib/python3.10/site-packages/cryptofeed/feed.py", line 258, in callback
    await cb(obj, receipt_timestamp)
  File "/home/ubuntu/miniforge3/lib/python3.10/site-packages/cryptofeed/backends/backend.py", line 97, in __call__
    await self.write(data)
  File "/home/ubuntu/miniforge3/lib/python3.10/site-packages/cryptofeed/backends/quest.py", line 63, in write
    await self.queue.put(update)
AttributeError: 'tuple' object has no attribute 'put'

Expected behavior
it should not raise the exception

Screenshots
If applicable, add screenshots to help explain your problem.

Operating System:

  • macOS, linux, etc

Cryptofeed Version

  • 2.4.0
@mirageAlchemy
Copy link
Author

after replace all the await self.queue.put(update) in quest.py with

        if self.multiprocess:
            self.queue[1].send(update)
        else:
            await self.queue.put(update)

the problem is gone. I noticed that for generally other backend handler, this branch condition is written in the base class, but BookQuest has its own __call__ and write, and did not add the branch.

@divyanshu-in
Copy link

@bmoscon I'm looking into it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants