Skip to content

Commit

Permalink
Decouple serializing messages w/ writing them to socket (#2155)
Browse files Browse the repository at this point in the history
* Move message writing to typescript lib

* Write more tests, cleanup code to some extent

* Rename package to something more representing its name

* Remove unused code

* Small tweaks based on microbenchmarks

* Rename w/o underscore
  • Loading branch information
brianc authored Apr 9, 2020
1 parent 2013d77 commit 3ff91ea
Show file tree
Hide file tree
Showing 17 changed files with 685 additions and 188 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
],
"scripts": {
"test": "yarn lerna exec yarn test",
"build": "yarn lerna exec --scope pg-packet-stream yarn build",
"build": "yarn lerna exec --scope pg-protocol yarn build",
"pretest": "yarn build",
"lint": "yarn lerna exec --parallel yarn lint"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"name": "pg-packet-stream",
"name": "pg-protocol",
"version": "1.1.0",
"description": "The postgres client/server binary protocol, implemented in TypeScript",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
Expand Down
24 changes: 24 additions & 0 deletions packages/pg-protocol/src/b.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// file for microbenchmarking

import { Writer } from './buffer-writer'
import { serialize } from './index'

const LOOPS = 1000
let count = 0
let start = Date.now()
const writer = new Writer()

const run = () => {
if (count > LOOPS) {
console.log(Date.now() - start)
return;
}
count++
for(let i = 0; i < LOOPS; i++) {
serialize.describe({ type: 'P'})
serialize.describe({ type: 'S'})
}
setImmediate(run)
}

run()
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ const emptyBuffer = Buffer.allocUnsafe(0);

export class BufferReader {
private buffer: Buffer = emptyBuffer;
// TODO(bmc): support non-utf8 encoding

// TODO(bmc): support non-utf8 encoding?
private encoding: string = 'utf-8';

constructor(private offset: number = 0) {
}
public setBuffer(offset: number, buffer: Buffer): void {
Expand Down
87 changes: 87 additions & 0 deletions packages/pg-protocol/src/buffer-writer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
//binary data writer tuned for encoding binary specific to the postgres binary protocol

export class Writer {
private buffer: Buffer;
private offset: number = 5;
private headerPosition: number = 0;
constructor(private size = 256) {
this.buffer = Buffer.alloc(size)
}

private ensure(size: number): void {
var remaining = this.buffer.length - this.offset;
if (remaining < size) {
var oldBuffer = this.buffer;
// exponential growth factor of around ~ 1.5
// https://stackoverflow.com/questions/2269063/buffer-growth-strategy
var newSize = oldBuffer.length + (oldBuffer.length >> 1) + size;
this.buffer = Buffer.alloc(newSize);
oldBuffer.copy(this.buffer);
}
}

public addInt32(num: number): Writer {
this.ensure(4);
this.buffer[this.offset++] = (num >>> 24 & 0xFF);
this.buffer[this.offset++] = (num >>> 16 & 0xFF);
this.buffer[this.offset++] = (num >>> 8 & 0xFF);
this.buffer[this.offset++] = (num >>> 0 & 0xFF);
return this;
}

public addInt16(num: number): Writer {
this.ensure(2);
this.buffer[this.offset++] = (num >>> 8 & 0xFF);
this.buffer[this.offset++] = (num >>> 0 & 0xFF);
return this;
}


public addCString(string: string): Writer {
if (!string) {
this.ensure(1);
} else {
var len = Buffer.byteLength(string);
this.ensure(len + 1); // +1 for null terminator
this.buffer.write(string, this.offset, 'utf-8')
this.offset += len;
}

this.buffer[this.offset++] = 0; // null terminator
return this;
}

public addString(string: string = ""): Writer {
var len = Buffer.byteLength(string);
this.ensure(len);
this.buffer.write(string, this.offset);
this.offset += len;
return this;
}

public add(otherBuffer: Buffer): Writer {
this.ensure(otherBuffer.length);
otherBuffer.copy(this.buffer, this.offset);
this.offset += otherBuffer.length;
return this;
}

private join(code?: number): Buffer {
if (code) {
this.buffer[this.headerPosition] = code;
//length is everything in this packet minus the code
const length = this.offset - (this.headerPosition + 1)
this.buffer.writeInt32BE(length, this.headerPosition + 1)
}
return this.buffer.slice(code ? 0 : 5, this.offset);
}

public flush(code?: number): Buffer {
var result = this.join(code);
this.offset = 5;
this.headerPosition = 0;
this.buffer = Buffer.allocUnsafe(this.size)
return result;
}
}

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import buffers from './testing/test-buffers'
import BufferList from './testing/buffer-list'
import { parse } from './'
import { parse } from '.'
import assert from 'assert'
import { PassThrough } from 'stream'
import { BackendMessage } from './messages'
Expand Down
11 changes: 11 additions & 0 deletions packages/pg-protocol/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { BackendMessage } from './messages';
import { serialize } from './serializer';
import { Parser, MessageCallback } from './parser'

export function parse(stream: NodeJS.ReadableStream, callback: MessageCallback): Promise<void> {
const parser = new Parser()
stream.on('data', (buffer: Buffer) => parser.parse(buffer, callback))
return new Promise((resolve) => stream.on('end', () => resolve()))
}

export { serialize };
File renamed without changes.
Loading

0 comments on commit 3ff91ea

Please sign in to comment.