Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions site/pages/docs/clients/transports/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ const [blockNumber, balance, ensName] = await Promise.all([
])
```

You can also use the `waitAsRateLimit` option to send one batch per `wait` milliseconds if the batch size is reached.

```ts twoslash
import { createPublicClient, http } from 'viem'
import { mainnet } from 'viem/chains'

const client = createPublicClient({
chain: mainnet,
transport: http('https://1.rpc.thirdweb.com/...', {
batch: {
batchSize: 3,
wait: 100,
waitAsRateLimit: true, // [!code focus]
},
}),
})
```

## Parameters

### url (optional)
Expand Down Expand Up @@ -123,6 +141,25 @@ const transport = http('https://1.rpc.thirdweb.com/...', {
})
```

### batch.waitAsRateLimit (optional)

- **Type:** `boolean`
- **Default:** `false`

Send one batch per `wait` milliseconds if the batch size is reached. By default, multiple batches are sent at once each `wait` milliseconds.

Warning: This can lead to a high number of pending requests if the batch size is constantly exceeded without enough time to clear the queue.

```ts twoslash
import { http } from 'viem'
// ---cut---
const transport = http('https://1.rpc.thirdweb.com/...', {
batch: {
waitAsRateLimit: true // [!code focus]
}
})
```

### fetchOptions (optional)

- **Type:** [`RequestInit`](https://developer.mozilla.org/en-US/docs/Web/API/fetch)
Expand Down
17 changes: 14 additions & 3 deletions src/actions/public/call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,20 @@ async function scheduleMulticall<chain extends Chain | undefined>(
const { schedule } = createBatchScheduler({
id: `${client.uid}.${block}`,
wait,
shouldSplitBatch(args) {
const size = args.reduce((size, { data }) => size + (data.length - 2), 0)
return size > batchSize * 2
getBatchSize: (args) => {
let accumulatedEffectiveSize = 0
let itemCount = 0

for (const item of args) {
const itemEffectiveSize = Math.max(0, item.data.length - 2)

if (accumulatedEffectiveSize + itemEffectiveSize > batchSize) break

accumulatedEffectiveSize += itemEffectiveSize
itemCount++
}

return itemCount
},
fn: async (
requests: {
Expand Down
19 changes: 14 additions & 5 deletions src/clients/transports/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ export type HttpTransportConfig<
batchSize?: number | undefined
/** The maximum number of milliseconds to wait before sending a batch. @default 0 */
wait?: number | undefined
/**
* Send one batch per `wait` milliseconds if the batch size is reached. By default, multiple batches are sent at once each `wait` milliseconds.
*
* Warning: This can lead to a high number of pending requests if the batch size is constantly exceeded without enough time to clear the queue.
* @default false */
waitAsRateLimit?: boolean | undefined
}
| undefined
/**
Expand Down Expand Up @@ -102,9 +108,13 @@ export function http<
retryDelay,
raw,
} = config

return ({ chain, retryCount: retryCount_, timeout: timeout_ }) => {
const { batchSize = 1000, wait = 0 } =
typeof batch === 'object' ? batch : {}
const {
batchSize = 1000,
wait = 0,
waitAsRateLimit = false,
} = typeof batch === 'object' ? batch : {}
const retryCount = config.retryCount ?? retryCount_
const timeout = timeout_ ?? config.timeout ?? 10_000
const url_ = url || chain?.rpcUrls.default.http[0]
Expand All @@ -128,9 +138,8 @@ export function http<
const { schedule } = createBatchScheduler({
id: url_,
wait,
shouldSplitBatch(requests) {
return requests.length > batchSize
},
getBatchSize: () => batchSize,
waitAsRateLimit,
fn: (body: RpcRequest[]) =>
rpcClient.request({
body,
Expand Down
90 changes: 43 additions & 47 deletions src/utils/promise/createBatchScheduler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,13 @@ test('args: wait', async () => {
expect(x10).toEqual([10, [8, 9, 10]])
})

test('args: shouldSplitBatch', async () => {
test('args: getBatchSize', async () => {
const fn = vi.fn()
const { schedule } = createBatchScheduler({
id: uid(),
// biome-ignore lint/style/noCommaOperator:
fn: async (args: number[]) => (fn(), args),
shouldSplitBatch: (args) => args.length > 3,
getBatchSize: () => 3,
})

const p = []
Expand Down Expand Up @@ -163,6 +163,47 @@ test('args: shouldSplitBatch', async () => {
expect(fn).toBeCalledTimes(6)
})

test('args: waitAsRateLimit', async () => {
const date = Date.now()

const fn = vi.fn()
const { schedule } = createBatchScheduler({
id: uid(),
getBatchSize: () => 3,
wait: 500,
waitAsRateLimit: true,
// biome-ignore lint/style/noCommaOperator:
fn: async (args: number[]) => (fn(), args),
})

const p = []

p.push(schedule(1))
p.push(schedule(2))
p.push(schedule(3))
p.push(schedule(4))
p.push(schedule(5))
p.push(schedule(6))
p.push(schedule(7))
p.push(schedule(8))
p.push(schedule(9))

const [x1, x2, x3, x4, x5, x6, x7, x8, x9] = await Promise.all(p)

expect(x1).toEqual([1, [1, 2, 3]])
expect(x2).toEqual([2, [1, 2, 3]])
expect(x3).toEqual([3, [1, 2, 3]])
expect(x4).toEqual([4, [4, 5, 6]])
expect(x5).toEqual([5, [4, 5, 6]])
expect(x6).toEqual([6, [4, 5, 6]])
expect(x7).toEqual([7, [7, 8, 9]])
expect(x8).toEqual([8, [7, 8, 9]])
expect(x9).toEqual([9, [7, 8, 9]])

const elapsed = Date.now() - date
expect(elapsed).toBeGreaterThanOrEqual(1500)
})

describe('behavior', () => {
test('complex args', async () => {
const { schedule } = createBatchScheduler({
Expand All @@ -188,51 +229,6 @@ describe('behavior', () => {
])
})

test('complex split batch', async () => {
const fn = vi.fn()
const { schedule } = createBatchScheduler({
id: uid(),
wait: 16,
// biome-ignore lint/style/noCommaOperator:
fn: async (args: string[]) => (fn(), args),
shouldSplitBatch: (args) =>
args.reduce((acc, x) => acc + x.length, 0) > 20,
})

const p = []
p.push(schedule('hello'))
p.push(schedule('world'))
p.push(schedule('this is me'))
p.push(schedule('life should be'))
p.push(schedule('fun for everyone'))
await wait(1)
p.push(schedule('hello world'))
p.push(schedule('come and see'))
p.push(schedule('come'))
p.push(schedule('and'))
await wait(16)
p.push(schedule('see'))
p.push(schedule('smile'))
p.push(schedule('just be yourself'))
p.push(schedule('be happy'))

const [x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11] = await Promise.all(p)

expect(x1).toEqual(['hello', ['hello', 'world', 'this is me']])
expect(x2).toEqual(['world', ['hello', 'world', 'this is me']])
expect(x3).toEqual(['this is me', ['hello', 'world', 'this is me']])
expect(x4).toEqual(['life should be', ['life should be']])
expect(x5).toEqual(['fun for everyone', ['fun for everyone']])
expect(x6).toEqual(['hello world', ['hello world']])
expect(x7).toEqual(['come and see', ['come and see', 'come', 'and']])
expect(x8).toEqual(['come', ['come and see', 'come', 'and']])
expect(x9).toEqual(['and', ['come and see', 'come', 'and']])
expect(x10).toEqual(['see', ['see', 'smile']])
expect(x11).toEqual(['smile', ['see', 'smile']])

expect(fn).toBeCalledTimes(8)
})

test('throws error', async () => {
const { schedule } = createBatchScheduler({
id: uid(),
Expand Down
52 changes: 33 additions & 19 deletions src/utils/promise/createBatchScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ type CreateBatchSchedulerArguments<
> = {
fn: (args: parameters[]) => Promise<returnType>
id: number | string
shouldSplitBatch?: ((args: parameters[]) => boolean) | undefined
getBatchSize?: ((args: parameters[]) => number) | undefined
wait?: number | undefined
waitAsRateLimit?: boolean
sort?: BatchResultsCompareFn<returnType[number]> | undefined
}

Expand All @@ -46,56 +47,69 @@ export function createBatchScheduler<
>({
fn,
id,
shouldSplitBatch,
getBatchSize,
wait = 0,
waitAsRateLimit = false,
sort,
}: CreateBatchSchedulerArguments<
parameters,
returnType
>): CreateBatchSchedulerReturnType<parameters, returnType> {
const exec = async () => {
const scheduler = getScheduler()
flush()
const items = getBatchItems()
const args = items.map(({ args }) => args)

const args = scheduler.map(({ args }) => args)

if (args.length === 0) return
if (args.length === 0) {
flush()
return
}

fn(args as parameters[])
.then((data) => {
if (sort && Array.isArray(data)) data.sort(sort)
for (let i = 0; i < scheduler.length; i++) {
const { resolve } = scheduler[i]
for (let i = 0; i < items.length; i++) {
const { resolve } = items[i]
resolve?.([data[i], data])
}
})
.catch((err) => {
for (let i = 0; i < scheduler.length; i++) {
const { reject } = scheduler[i]
for (let i = 0; i < items.length; i++) {
const { reject } = items[i]
reject?.(err)
}
})

if (waitAsRateLimit) {
setTimeout(() => {
clearBatchItems(items.length)
exec()
}, wait)
return
}

clearBatchItems(items.length)
exec()
}

const flush = () => schedulerCache.delete(id)

const getBatchedArgs = () =>
getScheduler().map(({ args }) => args) as parameters[]

const getScheduler = () => schedulerCache.get(id) || []

const setScheduler = (item: SchedulerItem) =>
schedulerCache.set(id, [...getScheduler(), item])
const getBatchedArgs = () =>
getScheduler().map(({ args }) => args) as parameters[]

// if batchSize is undefined, it takes all items
const batchSize = getBatchSize?.(getBatchedArgs())
const getBatchItems = () => getScheduler().slice(0, batchSize)
const clearBatchItems = (amount: number) =>
schedulerCache.set(id, getScheduler().slice(amount))

return {
flush,
async schedule(args: parameters) {
const { promise, resolve, reject } = withResolvers()

const split = shouldSplitBatch?.([...getBatchedArgs(), args])

if (split) exec()

const hasActiveScheduler = getScheduler().length > 0
if (hasActiveScheduler) {
setScheduler({ args, resolve, reject })
Expand Down