Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up HTTP async iterator code #411

Merged
merged 22 commits into from
May 20, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 44 additions & 64 deletions http/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,102 +249,82 @@ export class ServerRequest {
}

/** Continuously read more requests from conn until EOF
* Calls maybeHandleReq.
* bufr is empty on a fresh TCP connection.
* Would be passed around and reused for later request on same conn
* TODO: make them async function after this change is done
* https://github.com/tc39/ecma262/pull/1250
* See https://v8.dev/blog/fast-async
*/
async function readRequest(
c: HttpConn,
bufr: BufReader
): Promise<[ServerRequest, BufState]> {
async function* iterateHttpRequests(
ry marked this conversation as resolved.
Show resolved Hide resolved
c: HttpConn
): AsyncIterableIterator<[ServerRequest | null, BufState]> {
const bufr = new BufReader(c);
const bufw = new BufWriter(c);
const req = new ServerRequest();

// Set and incr pipeline id;
req.pipelineId = ++c.lastPipelineId;
// Set a new pipeline deferred associated with this request
// for future requests to wait for.
c.pendingDeferredMap.set(req.pipelineId, deferred());

req.conn = c;
req.r = bufr!;
req.w = bufw;
const tp = new TextProtoReader(bufr!);

let s: string;
let err: BufState;

// First line: GET /index.html HTTP/1.0
[s, err] = await tp.readLine();
if (err) {
return [null, err];
}
[req.method, req.url, req.proto] = s.split(" ", 3);

[req.headers, err] = await tp.readMIMEHeader();
for (;;) {
const req = new ServerRequest();

// Set and incr pipeline id;
req.pipelineId = ++c.lastPipelineId;
// Set a new pipeline deferred associated with this request
// for future requests to wait for.
c.pendingDeferredMap.set(req.pipelineId, deferred());

req.conn = c;
req.r = bufr!;
req.w = bufw;

// First line: GET /index.html HTTP/1.0
const tp = new TextProtoReader(bufr!);
ry marked this conversation as resolved.
Show resolved Hide resolved
let [s, err]: [string, BufState] = await tp.readLine();
if (err) {
yield [null, err];
return;
}

return [req, err];
[req.method, req.url, req.proto] = s.split(" ", 3);
[req.headers, err] = await tp.readMIMEHeader();
yield [req, err];
ry marked this conversation as resolved.
Show resolved Hide resolved
}
}

export class Server implements AsyncIterableIterator<ServerRequest> {
export class Server implements AsyncIterable<ServerRequest> {
private closing = false;
private looping = false;
private channel = new Channel<ServerRequest>();

constructor(public listener: Listener) {}

async acceptLoop(): Promise<void> {
assert(!this.looping);
this.looping = true;
try {
while (!this.closing) {
const conn = await this.listener.accept();
this.serveConn(conn); // async!
}
} finally {
this.looping = false;
}
}

close(): void {
this.closing = true;
this.listener.close();
}

async serveConn(conn: Conn): Promise<void> {
const httpConn = createHttpConn(conn);
const bufr = new BufReader(httpConn);
while (true) {
const [req, err] = await readRequest(httpConn, bufr);
if (err) {
// TODO(ry) This should be more granular. Perhaps return back a 400 or
// 500 error?
httpConn.close();
break;
private async *iterateRequests(): AsyncIterableIterator<ServerRequest> {
while (!this.closing) {
const conn = await this.listener.accept();
const httpConn = createHttpConn(conn);

for await (const [req, err] of iterateHttpRequests(httpConn)) {
if (err) {
// TODO(ry) This should be more granular. Perhaps return back a 400 or
ry marked this conversation as resolved.
Show resolved Hide resolved
// 500 error?
httpConn.close();
break;
}
yield req;
}

await this.channel.send(req);
}
}

async next(): Promise<IteratorResult<ServerRequest>> {
const req = await this.channel.recv();
return { done: false, value: req };
}

[Symbol.asyncIterator](): AsyncIterableIterator<ServerRequest> {
return this;
return this.iterateRequests();
}
}

export function serve(addr: string): Server {
const listener = listen("tcp", addr);
const server = new Server(listener);
server.acceptLoop();
return server;
return new Server(listener);
}

export async function listenAndServe(
Expand Down