Skip to content

Commit

Permalink
event stream: generate random realtime connection ID on listener start
Browse files Browse the repository at this point in the history
Signed-off-by: Sumner Evans <[email protected]>
  • Loading branch information
sumnerevans committed Jan 19, 2024
1 parent 6d7b4f0 commit 63b4237
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions linkedin_messaging/linkedin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
import json
import logging
import uuid

from bs4 import BeautifulSoup
from dataclasses_json.api import DataClassJsonMixin
Expand Down Expand Up @@ -131,7 +132,8 @@ class LinkedInMessaging:
],
]

_realtime_sesion_id: str = ""
_realtime_session_id: uuid.UUID
_realtime_connection_id: uuid.UUID | None = None

def __init__(self):
self.session = aiohttp.ClientSession()
Expand Down Expand Up @@ -545,7 +547,11 @@ async def _fire(self, payload_key: str, event: Any):
async def _listen_to_event_stream(self):
logging.info("Starting event stream listener")

headers = {"accept": "text/event-stream", **self._request_headers}
headers = {
"accept": "text/event-stream",
"x-li-realtime-session": str(self._realtime_session_id),
**self._request_headers,
}

async with self.session.get(
REALTIME_CONNECT_URL,
Expand Down Expand Up @@ -575,10 +581,7 @@ async def _listen_to_event_stream(self):

if cc := data.get("com.linkedin.realtimefrontend.ClientConnection", {}):
logging.info(f"Got realtime connection ID: {cc.get('id')}")
if not self._realtime_sesion_id:
logging.info("No existing realtime connection ID, setting the ID")
self._request_headers["x-li-realtime-session"] = cc.get("id")
self._realtime_sesion_id = cc.get("id")
self._realtime_connection_id = uuid.UUID(cc.get("id"))

event_payload = data.get("com.linkedin.realtimefrontend.DecoratedEvent", {}).get(
"payload", {}
Expand All @@ -596,17 +599,13 @@ async def _send_heartbeat(self, user_urn: URN):
await asyncio.sleep(60)
logging.info("Sending heartbeat")

if not self._realtime_sesion_id:
logging.warning("No realtime session ID. Skipping heartbeat.")
continue

await self._post(
CONNECTIVITY_TRACKING_URL,
params={"action": "sendHeartbeat"},
json={
"isFirstHeartbeat": False,
"isLastHeartbeat": False,
"realtimeSessionId": self._realtime_sesion_id,
"realtimeSessionId": str(self._realtime_session_id),
"mpName": "voyager-web",
"mpVersion": "1.13.8094",
"clientId": "voyager-web",
Expand All @@ -616,6 +615,8 @@ async def _send_heartbeat(self, user_urn: URN):
)

async def start_listener(self, user_urn: URN):
self._realtime_session_id = uuid.uuid4()
logging.info(f"Created realtime session ID: {self._realtime_session_id}")
while True:
try:
self._heartbeat_task = asyncio.create_task(self._send_heartbeat(user_urn))
Expand Down

0 comments on commit 63b4237

Please sign in to comment.