Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[node-core-library] Provide async utilities in core library #2665

Merged
merged 7 commits into from
May 1, 2021

Conversation

elliot-nelson
Copy link
Collaborator

@elliot-nelson elliot-nelson commented Apr 30, 2021

Summary

I needed a "map limit" implementation and did not see one in the existing codebase; however, @iclanton recently added a "forEach limit" implementation to the heft source.

This PR moves that function over to node-core-library instead so other projects can use it, and adds a couple other commonly used asynchronous functions.

(I'm not the first one to have this idea... looks like it was discussed before in pending PR #2431, although in that case it was exporting from heft.)

Details

  • Bring forEachLimitAsync over to a new module, Async, from heft.
  • Add a sibling method mapLimitAsync.
  • Enable the optional second param index for the callback function (to match the signature of Array#map and Array#forEach).
  • Add one more utility method Async.sleep. (In JavaScript I don't really bother, it's easy to make an inline setTimeout, but it's nice to have the method w/signature around for TypeScript in my opinion.)

The previous implementation of forEachLimitAsync has been left in heft and just calls through to the core library now (this is an area I could potentially clean up more).

@octogonz suggested an alternate approach here, which is to bring in a new third party library that does this. I often pull bottleneck into applications I'm working on, for example, to provide generic rate limiting. I'm open to that approach as well, but given that the guts were already implemented, this PR seems the easier option.

I've marked the new Async export as @public -- making this available to monorepo maintainers for use in repo-tasks, etc. (and not just for rushstack internal repo) seems pretty useful.

How it was tested

  • New unit tests for forEachLimitAsync and mapLimitAsync, to validate basic behavior.

/**
* Return a promise that resolves after the specified number of milliseconds.
*/
public static async sleep(ms: number): Promise<void> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful for this to accept a cancellation token, but we can add that later.

public static async mapLimitAsync<TEntry, TRetVal>(
array: TEntry[],
parallelismLimit: number,
fn: ((entry: TEntry) => Promise<TRetVal>) | ((entry: TEntry, index: number) => Promise<TRetVal>)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn: ((entry: TEntry) => Promise<TRetVal>) | ((entry: TEntry, index: number) => Promise<TRetVal>)
fn: (entry: TEntry, index: number) => Promise<TRetVal>

Someone can choose to just ignore the index parameter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this error if you pass a callback to the function that doesn't have an index param?

I guess to me this seems pretty unfriendly and veers further from the typical Array.map / Array.forEach behaviors, which I like to emulate because they are the baseline people are familiar with.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's what the Array.map typings say, I guess just do that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I reacted too early to an error I was seeing and made an assumption -- your example looks good, and simpler is probably better! I'll use the one without the algebra.

public static async forEachLimitAsync<TEntry>(
array: TEntry[],
parallelismLimit: number,
fn: ((entry: TEntry) => Promise<void>) | ((entry: TEntry, index: number) => Promise<void>)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn: ((entry: TEntry) => Promise<void>) | ((entry: TEntry, index: number) => Promise<void>)
fn: (entry: TEntry, index: number) => Promise<void>

parallelismLimit: number,
fn: ((entry: TEntry) => Promise<void>) | ((entry: TEntry, index: number) => Promise<void>)
): Promise<void> {
return new Promise((resolve: () => void, reject: (error: Error) => void) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return new Promise((resolve: () => void, reject: (error: Error) => void) => {
await new Promise((resolve: () => void, reject: (error: Error) => void) => {

* Return a promise that resolves after the specified number of milliseconds.
*/
public static async sleep(ms: number): Promise<void> {
return new Promise((resolve) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return new Promise((resolve) => {
await new Promise((resolve) => {

while (operationsInProgress < parallelismLimit) {
if (index < array.length) {
operationsInProgress++;
fn(array[index], index++)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if fn throws a synchronous exception?

@elliot-nelson
Copy link
Collaborator Author

After a long conversation with @octogonz, he's convinced me that it might be better to tweak the API for the async utilities.

  • Both Async.forEachAsync and Async.mapAsync now start out with signatures just like their corresponding Array friends, and take an optional "options" object. Without any options, these are now just nicer wrappers for the equivalent of Promise.all([]).
  • The "options" object has just one key today, concurrency: number. This makes the signature Async.mapAsync(items, fn, { concurrency: 10 }) identical to the classic Bluebird Promise.map signature, which I think is another familiar look for many developers.

In addition to the benefits of familiarity, having an options object that can be extended later gives a nice place to stash features that might come up (perhaps we want a different type of rate-limiting, like a per-second or debounce type behavior, instead of a concurrency limit; or maybe we want a boolean to control whether an error short-circuits the rest of the list or is guaranteed to execute all operations, etc.).

One other change I made is to add a little bit of protection against a synchronous function / synchronous throw. You shouldn't be able to encounter this if you're writing valid TypeScript, but you could imagine for example, someone writing a repo-task tool and using Async.forEachAsync(items, VendorPackage.write)... Maybe VendorPackage.write has no typings and is just any, or maybe it's typed as Promise<void> but not correctly wrapped, etc.

Last, I changed the definition from @public to @beta for the new functions in case we want to revisit.

@octogonz octogonz merged commit c2c27e2 into microsoft:master May 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants