Skip to content

Commit

Permalink
feat(cli): allow setting --max-fetch-concurrency to prevent stalled…
Browse files Browse the repository at this point in the history
… validators (#7450)
  • Loading branch information
ricokahler authored Sep 3, 2024
1 parent 8bc721b commit 85b0538
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ interface ValidateFlags {
'file'?: string
'level'?: 'error' | 'warning' | 'info'
'max-custom-validation-concurrency'?: number
'max-fetch-concurrency'?: number
'yes'?: boolean
'y'?: boolean
}
Expand Down Expand Up @@ -103,6 +104,15 @@ export default async function validateAction(
throw new Error(`'--max-custom-validation-concurrency' must be an integer.`)
}

const maxFetchConcurrency = flags['max-fetch-concurrency']
if (
maxFetchConcurrency &&
typeof maxFetchConcurrency !== 'number' &&
!Number.isInteger(maxFetchConcurrency)
) {
throw new Error(`'--max-fetch-concurrency' must be an integer.`)
}

const clientConfig: Partial<ClientConfig> = {
...apiClient({
requireUser: true,
Expand Down Expand Up @@ -140,6 +150,7 @@ export default async function validateAction(
workDir,
level,
maxCustomValidationConcurrency,
maxFetchConcurrency,
ndjsonFilePath,
reporter: (worker) => {
const reporter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import {
} from '../../threads/validateDocuments'
import {createReceiver, type WorkerChannelReceiver} from '../../util/workerChannels'

const DEFAULT_MAX_CUSTOM_VALIDATION_CONCURRENCY = 5

export interface ValidateDocumentsOptions<TReturn = unknown> {
level?: 'error' | 'warning' | 'info'
workspace?: string
Expand All @@ -23,6 +21,7 @@ export interface ValidateDocumentsOptions<TReturn = unknown> {
dataset?: string // override
ndjsonFilePath?: string
maxCustomValidationConcurrency?: number
maxFetchConcurrency?: number
reporter?: (worker: WorkerChannelReceiver<ValidationWorkerChannel>) => TReturn
}

Expand Down Expand Up @@ -72,6 +71,7 @@ export function validateDocuments(options: ValidateDocumentsOptions): unknown {
reporter = defaultReporter,
level,
maxCustomValidationConcurrency,
maxFetchConcurrency,
ndjsonFilePath,
} = options

Expand Down Expand Up @@ -100,8 +100,8 @@ export function validateDocuments(options: ValidateDocumentsOptions): unknown {
projectId,
level,
ndjsonFilePath,
maxCustomValidationConcurrency:
maxCustomValidationConcurrency ?? DEFAULT_MAX_CUSTOM_VALIDATION_CONCURRENCY,
maxCustomValidationConcurrency,
maxFetchConcurrency,
} satisfies ValidateDocumentsWorkerData,
// eslint-disable-next-line no-process-env
env: process.env,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Options
--format <pretty|ndjson|json> The output format used to print the found validation markers and report progress.
--level <error|warning|info> The minimum level reported out. Defaults to warning.
--max-custom-validation-concurrency <number> Specify how many custom validators can run concurrently. Defaults to 5.
--max-fetch-concurrency <number> Specify how many \`client.fetch\` requests are allow concurrency at once. Defaults to 25.
Examples
# Validates all documents in a Sanity project with more than one workspace
Expand All @@ -20,7 +21,7 @@ Examples
sanity documents validate --workspace default --dataset staging
# Save the results of the report into a file
sanity documents validate > report.txt
sanity documents validate --yes > report.txt
# Report out info level validation markers too
sanity documents validate --level info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export interface ValidateDocumentsWorkerData {
ndjsonFilePath?: string
level?: ValidationMarker['level']
maxCustomValidationConcurrency?: number
maxFetchConcurrency?: number
}

/** @internal */
Expand Down Expand Up @@ -79,6 +80,7 @@ const {
projectId,
level,
maxCustomValidationConcurrency,
maxFetchConcurrency,
} = _workerData as ValidateDocumentsWorkerData

if (isMainThread || !parentPort) {
Expand Down Expand Up @@ -359,6 +361,7 @@ async function validateDocuments() {
getDocumentExists,
environment: 'cli',
maxCustomValidationConcurrency,
maxFetchConcurrency,
}),
new Promise<typeof timeout>((resolve) =>
setTimeout(() => resolve(timeout), DOCUMENT_VALIDATION_TIMEOUT),
Expand Down
44 changes: 38 additions & 6 deletions packages/sanity/src/core/validation/validateDocument.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,23 @@ import {typeString} from './util/typeString'
import {unknownFieldsValidator} from './validators/unknownFieldsValidator'

// this is the number of requests allowed inflight at once. this is done to prevent
// the validation library from overwhelming our backend
const MAX_FETCH_CONCURRENCY = 10

const limitConcurrency = createClientConcurrencyLimiter(MAX_FETCH_CONCURRENCY)
// the validation library from overwhelming our backend.
// NOTE: this was upped from 10 to prevent issues where many concurrency
// `client.fetch` requests would "clog" custom validators from finishing due to
// not enough concurrent requests being fulfilled
//
// NOTE: ensure to update the TSDoc and CLI help test if this is changed
const DEFAULT_MAX_FETCH_CONCURRENCY = 25

// NOTE: ensure to update the TSDoc and CLI help test if this is changed
const DEFAULT_MAX_CUSTOM_VALIDATION_CONCURRENCY = 5

let _limitConcurrency: ReturnType<typeof createClientConcurrencyLimiter> | undefined
const getConcurrencyLimiter = (maxConcurrency: number) => {
if (_limitConcurrency) return _limitConcurrency
_limitConcurrency = createClientConcurrencyLimiter(maxConcurrency)
return _limitConcurrency
}

const isRecord = (maybeRecord: unknown): maybeRecord is Record<string, unknown> =>
typeof maybeRecord === 'object' && maybeRecord !== null && !Array.isArray(maybeRecord)
Expand Down Expand Up @@ -104,8 +117,21 @@ export interface ValidateDocumentOptions {
* concurrently at once. This helps prevent custom validators from
* overwhelming backend services (e.g. called via fetch) used in async,
* user-defined validation functions. (i.e. `rule.custom(async() => {})`)
*
* Note that lowering this number may also help in cases where a custom
* validator could potentially exhaust the fetch concurrency. This is 5 by
* default.
*/
maxCustomValidationConcurrency?: number

/**
* The amount of allowed inflight fetch requests at once. You may need to up
* this value if you have complex custom validations that require many
* `client.fetch` requests at once. It's possible for custom validator to
* stall if there are not enough concurrent fetch requests available to
* fullfil the custom validation. This is 25 by default.
*/
maxFetchConcurrency?: number
}

/**
Expand All @@ -118,9 +144,13 @@ export function validateDocument({
document,
workspace,
environment = 'studio',
maxFetchConcurrency,
...options
}: ValidateDocumentOptions): Promise<ValidationMarker[]> {
const getClient = options.getClient || workspace.getClient
const limitConcurrency = getConcurrencyLimiter(
maxFetchConcurrency ?? DEFAULT_MAX_FETCH_CONCURRENCY,
)
const getConcurrencyLimitedClient = (clientOptions: SourceClientOptions) =>
limitConcurrency(getClient(clientOptions))

Expand Down Expand Up @@ -190,8 +220,10 @@ export function validateDocumentObservable({
}

let customValidationConcurrencyLimiter = customValidationConcurrencyLimiters.get(schema)
if (!customValidationConcurrencyLimiter && maxCustomValidationConcurrency) {
customValidationConcurrencyLimiter = new ConcurrencyLimiter(maxCustomValidationConcurrency)
if (!customValidationConcurrencyLimiter) {
customValidationConcurrencyLimiter = new ConcurrencyLimiter(
maxCustomValidationConcurrency ?? DEFAULT_MAX_CUSTOM_VALIDATION_CONCURRENCY,
)
customValidationConcurrencyLimiters.set(schema, customValidationConcurrencyLimiter)
}

Expand Down

0 comments on commit 85b0538

Please sign in to comment.