Skip to content

Commit

Permalink
Implement channel status bulk-check logic
Browse files Browse the repository at this point in the history
  • Loading branch information
DevilXD committed Sep 15, 2024
1 parent 06d495c commit b9dfc75
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 13 deletions.
33 changes: 25 additions & 8 deletions channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
if TYPE_CHECKING:
from twitch import Twitch
from gui import ChannelList
from constants import JsonType
from constants import JsonType, GQLOperation


logger = logging.getLogger("TwitchDrops")
Expand Down Expand Up @@ -173,6 +173,10 @@ def __eq__(self, other: object) -> bool:
def __hash__(self) -> int:
return self.id

@property
def stream_gql(self) -> GQLOperation:
return GQL_OPERATIONS["GetStreamInfo"].with_variables({"channel": self._login})

@property
def name(self) -> str:
if self._display_name is not None:
Expand Down Expand Up @@ -245,11 +249,25 @@ def remove(self):
self._pending_stream_up = None
self._gui_channels.remove(self)

def external_update(self, channel_data: JsonType, available_drops: list[JsonType]):
"""
Update stream information based on data provided externally.
Used for bulk-updates of channel statuses during reload.
"""
if not channel_data["stream"]:
self._stream = None
return
stream = Stream.from_get_stream(self, channel_data)
if not stream.drops_enabled:
stream.drops_enabled = any(
bool(campaign["timeBasedDrops"]) for campaign in available_drops
)
self._stream = stream

async def get_stream(self) -> Stream | None:
try:
response: JsonType = await self._twitch.gql_request(
GQL_OPERATIONS["GetStreamInfo"].with_variables({"channel": self._login})
)
response: JsonType = await self._twitch.gql_request(self.stream_gql)
except MinerException as exc:
raise MinerException(f"Channel: {self._login}") from exc
channel_data: JsonType | None = response["data"]["user"]
Expand Down Expand Up @@ -277,7 +295,7 @@ async def get_stream(self) -> Stream | None:
)
return stream

async def update_stream(self, *, trigger_events: bool) -> bool:
async def update_stream(self) -> bool:
"""
Fetches the current channel stream, and if one exists,
updates it's game, title, tags and viewers. Updates channel status in general.
Expand All @@ -287,8 +305,7 @@ async def update_stream(self, *, trigger_events: bool) -> bool:
"""
old_stream = self._stream
self._stream = await self.get_stream()
if trigger_events:
self._twitch.on_channel_update(self, old_stream, self._stream)
self._twitch.on_channel_update(self, old_stream, self._stream)
return self._stream is not None

async def _online_delay(self):
Expand All @@ -298,7 +315,7 @@ async def _online_delay(self):
"""
await asyncio.sleep(ONLINE_DELAY.total_seconds())
self._pending_stream_up = None # for 'display' to work properly
await self.update_stream(trigger_events=True) # triggers 'display' via the event
await self.update_stream()

def check_online(self):
"""
Expand Down
52 changes: 47 additions & 5 deletions twitch.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,11 +744,7 @@ async def _run(self):
# remove all ACL channels that already exist from the other set
acl_channels.difference_update(new_channels)
# use the other set to set them online if possible
# if acl_channels:
# await asyncio.gather(*(
# channel.update_stream(trigger_events=False)
# for channel in acl_channels
# ))
await self.bulk_check_online(acl_channels)
# finally, add them as new channels
new_channels.update(acl_channels)
for game in no_acl:
Expand Down Expand Up @@ -1610,3 +1606,49 @@ async def claim_points(self, channel_id: str | int, claim_id: str) -> None:
{"input": {"channelID": str(channel_id), "claimID": claim_id}}
)
)

async def bulk_check_online(self, channels: abc.Iterable[Channel]):
"""
Utilize batch GQL requests to check ONLINE status for a lot of channels at once.
Also handles the drops_enabled check.
"""
acl_streams_map: dict[int, JsonType] = {}
stream_gql_ops: list[GQLOperation] = [channel.stream_gql for channel in channels]
if not stream_gql_ops:
# shortcut for nothing to process
# NOTE: Have to do this here, becase "channels" can be any iterable
return
for coro in asyncio.as_completed([
self.gql_request(stream_gql_chunk)
for stream_gql_chunk in chunk(stream_gql_ops, 20)
]):
response_list: list[JsonType] = await coro
for response_json in response_list:
channel_data: JsonType = response_json["data"]["user"]
acl_streams_map[int(channel_data["id"])] = channel_data
# for all channels with an active stream, check the available drops as well
acl_available_drops_map: dict[int, list[JsonType]] = {}
available_gql_ops: list[GQLOperation] = [
GQL_OPERATIONS["AvailableDrops"].with_variables({"channelID": str(channel_id)})
for channel_id, channel_data in acl_streams_map.items()
if channel_data["stream"] is not None # only do this for ONLINE channels
]
for coro in asyncio.as_completed([
self.gql_request(available_gql_chunk)
for available_gql_chunk in chunk(available_gql_ops, 20)
]):
response_list = await coro
for response_json in response_list:
available_info: JsonType = response_json["data"]["channel"]
acl_available_drops_map[int(available_info["id"])] = (
available_info["viewerDropCampaigns"] or []
)
for channel in channels:
channel_id = channel.id
if channel_id not in acl_streams_map:
continue
channel_data = acl_streams_map[channel_id]
if channel_data["stream"] is None:
continue
available_drops: list[JsonType] = acl_available_drops_map[channel_id]
channel.external_update(channel_data, available_drops)

0 comments on commit b9dfc75

Please sign in to comment.