Skip to content

How to properly delete a stream completely from the manager? #307

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
oliver-zehentleitner opened this issue Mar 4, 2023 Discussed in #279 · 5 comments
Closed

How to properly delete a stream completely from the manager? #307

oliver-zehentleitner opened this issue Mar 4, 2023 Discussed in #279 · 5 comments
Assignees
Labels
enhancement New feature or request question Further information is requested
Milestone

Comments

@oliver-zehentleitner
Copy link
Member

Discussed in #279

Originally posted by Scoooooba June 8, 2022
Hi Guys,

given the following simple code:

from unicorn_binance_websocket_api.manager import BinanceWebSocketApiManager
websocket_manager = BinanceWebSocketApiManager()
stream_id = websocket_manager.create_stream('depth', 'BNBBUSD')

When it is executed, entries for this particular stream_id are made at least in the following dictionaries (maybe in more, these were the ones I've quickly been able to find):

websocket_manager.specific_process_stream_data
websocket_manager.stream_threading_lock
websocket_manager.stream_list
websocket_manager.event_loops
websocket_manager.socket_is_ready
websocket_manager.stream_threads
websocket_manager.stream_threading_lock
websocket_manager.websocket_list

After stopping the stream with the websocket_manager.stop_stream(stream_id) command, the entries for this stream_id in the different dictionaries are still existing. There is a function websocket_manager.delete_stream_from_stream_list(stream_id) to remove the stream_id entry from the websocket_manager.stream_list dicionary, but as far as I know there are no functions to delete the stream_id from all the other dictionaries.

Wouldn't it make sense to implement (a) function(s) to remove a stream_id completely from the websocket_manager, so the manager can be properly cleaned up? Otherwise these dictionaries will grow a lot, if several streams will be created and stopped over some time.

Best regards
Sebastian

@oliver-zehentleitner oliver-zehentleitner self-assigned this Mar 4, 2023
@oliver-zehentleitner oliver-zehentleitner added the bug Something isn't working label Mar 4, 2023
@oliver-zehentleitner oliver-zehentleitner added enhancement New feature or request question Further information is requested and removed bug Something isn't working labels Mar 24, 2023
@Scoooooba
Copy link

Hi Oliver,
thanks for proving an update for this issue.
I tested your minimal code example:

from unicorn_binance_websocket_api.manager import BinanceWebSocketApiManager
import time

def callback_func(stream_data):
    print(stream_data)

ubwa = BinanceWebSocketApiManager(debug=True)
ubwa.print_summary()
stream_id = ubwa.create_stream('depth20@1000ms', 'BTCUSDT', output='dict', process_stream_data=callback_func)
time.sleep(5)
stopped = ubwa.stop_stream(stream_id)
ubwa.wait_till_stream_has_stopped(stream_id)
ubwa.print_summary()
deleted = ubwa.delete_stream_from_stream_list(stream_id)
ubwa.stop_manager_with_all_streams()


print(f"ubwa.specific_process_stream_data: {ubwa.specific_process_stream_data}")
print(f"ubwa.stream_threading_lock:        {ubwa.stream_threading_lock}")
print(f"ubwa.stream_list:                  {ubwa.stream_list}")
print(f"ubwa.event_loops:                  {ubwa.event_loops}")
print(f"ubwa.socket_is_ready:              {ubwa.socket_is_ready}")
print(f"ubwa.stream_threads:               {ubwa.stream_threads}")
print(f"ubwa.stream_threading_lock:        {ubwa.stream_threading_lock}")
print(f"ubwa.websocket_list:               {ubwa.websocket_list}")

This prints:

ubwa.specific_process_stream_data: {'f74947a228c4-9cb4-b278-b192-0821139f': <function callback_func at 0x0000022B71AFA160>}
ubwa.stream_threading_lock:        {'f74947a228c4-9cb4-b278-b192-0821139f': {'full_lock': <unlocked _thread.lock object at 0x0000022B74344B80>, 'receives_statistic_last_second_lock': <unlocked _thread.lock object at 0x0000022B741BB880>}}
ubwa.stream_list:                  {}
ubwa.event_loops:                  {'f74947a228c4-9cb4-b278-b192-0821139f': <_WindowsSelectorEventLoop running=False closed=True debug=False>}
ubwa.socket_is_ready:              {'f74947a228c4-9cb4-b278-b192-0821139f': True}
ubwa.stream_threads:               {'f74947a228c4-9cb4-b278-b192-0821139f': <Thread(_create_stream_thread: stream_id=f74947a228c4-9cb4-b278-b192-0821139f, time=1710263747.0250547, stopped 4676)>}
ubwa.stream_threading_lock:        {'f74947a228c4-9cb4-b278-b192-0821139f': {'full_lock': <unlocked _thread.lock object at 0x0000022B74344B80>, 'receives_statistic_last_second_lock': <unlocked _thread.lock object at 0x0000022B741BB880>}}
ubwa.websocket_list:               {'f74947a228c4-9cb4-b278-b192-0821139f': <websockets.legacy.client.WebSocketClientProtocol object at 0x0000022B73E5FB90>}

So it seems like there is still some remaining data even after removing the stream. I can still see some potential issues here, because these dictionaries will grow a lot, if several streams will be created and stopped over some time.

I need to mention that I tested it with version 1.46.2, so I don't know if you already reworked it in newer versions.
I'm using your library in only a small hobby project of mine that I'm currently not willing to spent any money on, as long as I'm not very certain that it will give me some profits ;)

@oliver-zehentleitner
Copy link
Member Author

Hello!

wait_till_stream_has_stopped() is fixed and the current versions offer a number of improvements.

I am currently preparing a new release which will be published tomorrow. I can implement this point that we discussed in "Discussions" (#279 (comment)), that should still work out.

@oliver-zehentleitner oliver-zehentleitner modified the milestones: 2.1.0, 2.2.0 Mar 12, 2024
oliver-zehentleitner added a commit that referenced this issue Mar 13, 2024
@oliver-zehentleitner
Copy link
Member Author

Its ready and released :)

https://pypi.org/project/unicorn-binance-websocket-api/2.2.0/

Set auto_data_cleanup_stopped_streams to True and there will be no more remaining data!

ubwa = BinanceWebSocketApiManager(auto_data_cleanup_stopped_streams=True)

@Scoooooba
Copy link

Hi Oliver,
looking at your source code the remove_all_data_of_stream_id seems to do the job!

However, I stumbled over the _auto_data_cleanup_stopped_streams function:

    def _auto_data_cleanup_stopped_streams(self, interval=60):
        timestamp_last_check = 0
        while self.is_manager_stopping() is False:
            logger.info(f"BinanceWebSocketApiManager._auto_data_cleanup_stopped_streams() - Starting with an interval "
                        f"of {interval} seconds!")
            if self.get_timestamp_unix() > timestamp_last_check + interval:
                timestamp_last_check = self.get_timestamp_unix()
                if self.auto_data_cleanup_stopped_streams is True:
                    stopped_streams = []
                    for stream_id in self.stream_list:
                        stopped_streams.append(stream_id)
                    for stream_id in stopped_streams:
                        try:
                            restart_status = self.restart_requests[stream_id].get("status")
                        except KeyError:
                            restart_status = None
                        if self.stream_list[stream_id]['status'] == "stopped" and restart_status != "new":
                            logger.info(f"BinanceWebSocketApiManager._auto_data_cleanup_stopped_streams() - Removing "
                                        f"all remaining data of stream with stream_id={stream_id} from this instance!")
                            self.remove_all_data_of_stream_id(stream_id=stream_id)
                            logger.info(f"BinanceWebSocketApiManager._auto_data_cleanup_stopped_streams() - Remaining "
                                        f"data of stream with stream_id={stream_id} successfully removed from this "
                                        f"instance!")
            time.sleep(1)

Isn't the logger.info(f"BinanceWebSocketApiManager._auto_data_cleanup_stopped_streams() - Starting with an interval " f"of {interval} seconds!") command at the wrong line? Shouldn't it be before the while statement? Otherwise I guess that this log entry will be written/displayed every second.

@oliver-zehentleitner
Copy link
Member Author

Right, thank you! I've already fixed it. There will be a new release in a few hours.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants