Skip to content

Apply the retry code to the async pubsub client#8503

Merged
steveluscher merged 2 commits intoanza-xyz:masterfrom
steveluscher:pubsub-just-use-the-async-impl
Oct 20, 2025
Merged

Apply the retry code to the async pubsub client#8503
steveluscher merged 2 commits intoanza-xyz:masterfrom
steveluscher:pubsub-just-use-the-async-impl

Conversation

@steveluscher
Copy link
Copy Markdown

Four years ago, retry was added to the sync client. Never to the async client.

UNTIL TODAY.

Test plan

Create a test server

import http from "http";

import { WebSocketServer } from "ws";

let attemptCount = 0;

const server = http.createServer();
const wss = new WebSocketServer({ noServer: true });

wss.on("connection", (ws) => {
  ws.send("Connection accepted.");
  ws.on("message", (msg) => console.log(`Received: ${msg}`));
});

server.on("upgrade", (req, socket, head) => {
  attemptCount += 1;

  if (attemptCount <= 4) {
    socket.write("HTTP/1.1 429 Too Many Requests\r\n\r\n");
    socket.destroy();
    console.log(`Rejected connection #${attemptCount} (429)`);
    return;
  }

  wss.handleUpgrade(req, socket, head, (ws) => {
    wss.emit("connection", ws, req);
    console.log("Connection accepted on attempt", attemptCount);
  });
});

server.listen(8080, () => {
  console.log("Server listening on port 8080");
});

Run test_slot_subscription_async:

Rejected connection #1 (429)
Rejected connection #2 (429)
Rejected connection #3 (429)
Rejected connection #4 (429)
Connection accepted on attempt 5
Received: {"id":1,"jsonrpc":"2.0","method":"slotSubscribe","params":[]}

Create a test server

```ts
import http from "http";

import { WebSocketServer } from "ws";

let attemptCount = 0;

const server = http.createServer();
const wss = new WebSocketServer({ noServer: true });

wss.on("connection", (ws) => {
  ws.send("Connection accepted.");
  ws.on("message", (msg) => console.log(`Received: ${msg}`));
});

server.on("upgrade", (req, socket, head) => {
  attemptCount += 1;

  if (attemptCount <= 4) {
    socket.write("HTTP/1.1 429 Too Many Requests\r\n\r\n");
    socket.destroy();
    console.log(`Rejected connection #${attemptCount} (429)`);
    return;
  }

  wss.handleUpgrade(req, socket, head, (ws) => {
    wss.emit("connection", ws, req);
    console.log("Connection accepted on attempt", attemptCount);
  });
});

server.listen(8080, () => {
  console.log("Server listening on port 8080");
});
```

Run `test_slot_subscription_async`:

```
Rejected connection #1 (429)
Rejected connection #2 (429)
Rejected connection anza-xyz#3 (429)
Rejected connection anza-xyz#4 (429)
Connection accepted on attempt 5
Received: {"id":1,"jsonrpc":"2.0","method":"slotSubscribe","params":[]}
```
@steveluscher steveluscher added the automerge automerge Merge this Pull Request automatically once CI passes label Oct 15, 2025
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Oct 15, 2025

Codecov Report

❌ Patch coverage is 0% with 31 lines in your changes missing coverage. Please review.
✅ Project coverage is 83.1%. Comparing base (0199a3a) to head (6f0e8e8).
⚠️ Report is 50 commits behind head on master.

Additional details and impacted files
@@           Coverage Diff            @@
##           master    #8503    +/-   ##
========================================
  Coverage    83.1%    83.1%            
========================================
  Files         840      840            
  Lines      367669   367698    +29     
========================================
+ Hits       305719   305875   +156     
+ Misses      61950    61823   -127     
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@steveluscher steveluscher marked this pull request as ready for review October 16, 2025 15:15
joncinque
joncinque previously approved these changes Oct 20, 2025
Copy link
Copy Markdown

@joncinque joncinque left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me! Just the nit on naming, which you can take or leave

Comment thread pubsub-client/src/nonblocking/pubsub_client.rs Outdated
@anza-team anza-team removed the automerge automerge Merge this Pull Request automatically once CI passes label Oct 20, 2025
@anza-team
Copy link
Copy Markdown
Collaborator

😱 New commits were pushed while the automerge label was present.

@steveluscher steveluscher added the automerge automerge Merge this Pull Request automatically once CI passes label Oct 20, 2025
Copy link
Copy Markdown

@joncinque joncinque left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, thanks!

@steveluscher steveluscher added this pull request to the merge queue Oct 20, 2025
Merged via the queue into anza-xyz:master with commit 453b75f Oct 20, 2025
45 checks passed
@steveluscher steveluscher deleted the pubsub-just-use-the-async-impl branch October 20, 2025 19:54
rustopian pushed a commit to rustopian/agave that referenced this pull request Nov 20, 2025
* Apply the retry code to the async pubsub client

Create a test server

```ts
import http from "http";

import { WebSocketServer } from "ws";

let attemptCount = 0;

const server = http.createServer();
const wss = new WebSocketServer({ noServer: true });

wss.on("connection", (ws) => {
  ws.send("Connection accepted.");
  ws.on("message", (msg) => console.log(`Received: ${msg}`));
});

server.on("upgrade", (req, socket, head) => {
  attemptCount += 1;

  if (attemptCount <= 4) {
    socket.write("HTTP/1.1 429 Too Many Requests\r\n\r\n");
    socket.destroy();
    console.log(`Rejected connection #${attemptCount} (429)`);
    return;
  }

  wss.handleUpgrade(req, socket, head, (ws) => {
    wss.emit("connection", ws, req);
    console.log("Connection accepted on attempt", attemptCount);
  });
});

server.listen(8080, () => {
  console.log("Server listening on port 8080");
});
```

Run `test_slot_subscription_async`:

```
Rejected connection #1 (429)
Rejected connection #2 (429)
Rejected connection #3 (429)
Rejected connection #4 (429)
Connection accepted on attempt 5
Received: {"id":1,"jsonrpc":"2.0","method":"slotSubscribe","params":[]}
```

* `s/async_with_retry/with_retry/`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

automerge automerge Merge this Pull Request automatically once CI passes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants