Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bun.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"name": "@s2-dev/streamstore",
"dependencies": {
"@protobuf-ts/runtime": "^2.11.1",
"debug": "^4.4.3",
},
"devDependencies": {
"@arethetypeswrong/cli": "^0.18.2",
Expand All @@ -13,6 +14,7 @@
"@hey-api/openapi-ts": "^0.86.0",
"@protobuf-ts/plugin": "^2.11.1",
"@types/bun": "^1.3.1",
"@types/debug": "^4.1.12",
"openapi-typescript": "^7.10.1",
"protoc": "^33.0.0",
"typedoc": "^0.28.14",
Expand Down Expand Up @@ -250,6 +252,8 @@

"@types/chai": ["@types/[email protected]", "", { "dependencies": { "@types/deep-eql": "*", "assertion-error": "^2.0.1" } }, "sha512-Mw558oeA9fFbv65/y4mHtXDs9bPnFMZAL/jxdPFUpOHHIXX91mcgEHbS5Lahr+pwZFR8A7GQleRWeI6cGFC2UA=="],

"@types/debug": ["@types/[email protected]", "", { "dependencies": { "@types/ms": "*" } }, "sha512-vIChWdVG3LG1SMxEvI/AK+FWJthlrqlTu7fbrlywTkkaONwk/UAGaULXRlf8vkzFBLVm0zkMdCquhL5aOjhXPQ=="],

"@types/deep-eql": ["@types/[email protected]", "", {}, "sha512-c9h9dVVMigMPc4bwTvC5dxqtqJZwQPePsWjPlpSOnojbor6pGqdk541lfA7AqFQr5pB1BRdq0juY9db81BwyFw=="],

"@types/estree": ["@types/[email protected]", "", {}, "sha512-dWHzHa2WqEXI/O1E9OjrocMTKJl2mSrEolh1Iomrv6U+JuNwaHXsXx9bLu5gG7BUWFIN0skIQJQ/L1rIex4X6w=="],
Expand All @@ -258,6 +262,8 @@

"@types/json-schema": ["@types/[email protected]", "", {}, "sha512-5+fP8P8MFNC+AyZCDxrB2pkZFPGzqQWUzpSeuuVLvm8VMcorNYavBqoFcxK8bQz4Qsbn4oUEEem4wDLfcysGHA=="],

"@types/ms": ["@types/[email protected]", "", {}, "sha512-GsCCIZDE/p3i96vtEqx+7dBUGXrc7zeSK3wwPHIaRThS+9OhWIXRqzs4d6k1SVU8g91DrNRWxWUGhp5KXQb2VA=="],

"@types/node": ["@types/[email protected]", "", { "dependencies": { "undici-types": "~7.16.0" } }, "sha512-uWN8YqxXxqFMX2RqGOrumsKeti4LlmIMIyV0lgut4jx7KQBcBiW6vkDtIBvHnHIquwNfJhk8v2OtmO8zXWHfPA=="],

"@types/react": ["@types/[email protected]", "", { "dependencies": { "csstype": "^3.0.2" } }, "sha512-6mDvHUFSjyT2B2yeNx2nUgMxh9LtOWvkhIU3uePn2I2oyNymUAX1NIsdgviM4CH+JSrp2D2hsMvJOkxY+0wNRA=="],
Expand Down
37 changes: 35 additions & 2 deletions examples/image.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ function rechunkStream(

const s2 = new S2({
accessToken: process.env.S2_ACCESS_TOKEN!,
retry: {
maxAttempts: 10,
retryBackoffDurationMs: 100,
appendRetryPolicy: "noSideEffects",
},
});

const basinName = process.env.S2_BASIN;
Expand All @@ -45,12 +50,26 @@ const stream = basin.stream("image");
const startAt = await stream.checkTail();

const session = await stream.appendSession({
maxQueuedBytes: 1024 * 1024 * 10,
maxQueuedBytes: 1024 * 1024, // 1MiB
});
let image = await fetch(
"https://upload.wikimedia.org/wikipedia/commons/2/24/Peter_Paul_Rubens_-_Self-portrait_-_RH.S.180_-_Rubenshuis_%28after_restoration%29.jpg",
);

function mapWithIndexAsync<T, U>(
fn: (value: T, index: number) => Promise<U> | U,
): TransformStream<T, U> {
let index = 0;

return new TransformStream<T, U>({
async transform(chunk, controller) {
const out = await fn(chunk, index);
index += 1;
controller.enqueue(out);
},
});
}

// Write directly from fetch response to S2 stream
let append = await image
.body! // Ensure each chunk is at most 128KiB. S2 has a maximum individual record size of 1MiB.
Expand All @@ -63,10 +82,24 @@ let append = await image
},
}),
)
.pipeThrough(
mapWithIndexAsync(
(record, index) =>
({
...record,
headers: [
[
new TextEncoder().encode("index"),
new TextEncoder().encode(index.toString()),
],
],
}) as AppendRecord,
),
)
// Collect records into batches.
.pipeThrough(
new BatchTransform({
lingerDurationMillis: 50,
lingerDurationMillis: 5,
match_seq_num: startAt.tail.seq_num,
}),
)
Expand Down
2 changes: 1 addition & 1 deletion examples/read.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ if (streams.streams[0]) {

for await (const record of readSession) {
console.log(`[seq ${record.seq_num}] ${record.body}`);
console.log("new tail", readSession.lastReadPosition()?.seq_num);
console.log("new tail", readSession.nextReadPosition()?.seq_num);
}
console.log("Done reading");
}
69 changes: 69 additions & 0 deletions examples/throughput.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { createWriteStream } from "node:fs";
import {
AppendRecord,
BatchTransform,
type ReadRecord,
S2,
} from "../src/index.js";

function createStringStream(
n: number,
delayMs: number = 0,
): ReadableStream<string> {
let count = 0;
return new ReadableStream<string>({
async pull(controller) {
if (count < n) {
if (delayMs > 0) {
await new Promise((resolve) => setTimeout(resolve, delayMs));
}
console.log("pull", count);
const randomChars = Array.from({ length: 1024 * 10 }, () =>
String.fromCharCode(97 + Math.floor(Math.random() * 26)),
).join("");

var str = `${count} ${randomChars}`;
controller.enqueue(str);
count++;
} else {
controller.close();
}
},
});
}

const s2 = new S2({
accessToken: process.env.S2_ACCESS_TOKEN!,
retry: {
maxAttempts: 10,
retryBackoffDurationMs: 100,
appendRetryPolicy: "noSideEffects",
requestTimeoutMillis: 10000,
},
});

const basinName = process.env.S2_BASIN;
if (!basinName) {
console.error("S2_BASIN environment variable is not set");
process.exit(1);
}

const basin = s2.basin(basinName!);
const stream = basin.stream("throughput");

const sesh = await stream.appendSession({ maxQueuedBytes: 1024 * 1024 * 5 });

createStringStream(1000000, 0)
.pipeThrough(
new TransformStream<string, AppendRecord>({
transform(arr, controller) {
controller.enqueue(AppendRecord.make(arr));
},
}),
)
.pipeThrough(
new BatchTransform({
lingerDurationMillis: 100,
}),
)
.pipeTo(sesh.writable);
7 changes: 5 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
"LICENSE"
],
"dependencies": {
"@protobuf-ts/runtime": "^2.11.1"
"@protobuf-ts/runtime": "^2.11.1",
"debug": "^4.4.3"
},
"devDependencies": {
"@arethetypeswrong/cli": "^0.18.2",
Expand All @@ -54,12 +55,14 @@
"@hey-api/openapi-ts": "^0.86.0",
"@protobuf-ts/plugin": "^2.11.1",
"@types/bun": "^1.3.1",
"@types/debug": "^4.1.12",
"openapi-typescript": "^7.10.1",
"protoc": "^33.0.0",
"typedoc": "^0.28.14",
"vitest": "^4.0.2"
},
"peerDependencies": {
"typescript": "^5.9.3"
}
},
"packageManager": "[email protected]+sha512.5ea8b0deed94ed68691c9bad4c955492705c5eeb8a87ef86bc62c74a26b037b08ff9570f108b2e4dbd1dd1a9186fea925e527f141c648e85af45631074680184"
}
75 changes: 30 additions & 45 deletions src/accessTokens.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { DataToObject, S2RequestOptions } from "./common.js";
import { S2Error } from "./error.js";
import type { DataToObject, RetryConfig, S2RequestOptions } from "./common.js";
import { S2Error, withS2Data } from "./error.js";
import type { Client } from "./generated/client/types.gen.js";
import {
type IssueAccessTokenData,
Expand All @@ -9,6 +9,7 @@ import {
type RevokeAccessTokenData,
revokeAccessToken,
} from "./generated/index.js";
import { withRetries } from "./lib/retry.js";

export interface ListAccessTokensArgs
extends DataToObject<ListAccessTokensData> {}
Expand All @@ -19,9 +20,11 @@ export interface RevokeAccessTokenArgs

export class S2AccessTokens {
readonly client: Client;
private readonly retryConfig?: RetryConfig;

constructor(client: Client) {
constructor(client: Client, retryConfig?: RetryConfig) {
this.client = client;
this.retryConfig = retryConfig;
}

/**
Expand All @@ -32,21 +35,15 @@ export class S2AccessTokens {
* @param args.limit Max results (up to 1000)
*/
public async list(args?: ListAccessTokensArgs, options?: S2RequestOptions) {
const response = await listAccessTokens({
client: this.client,
query: args,
...options,
return await withRetries(this.retryConfig, async () => {
return await withS2Data(() =>
listAccessTokens({
client: this.client,
query: args,
...options,
}),
);
});

if (response.error) {
throw new S2Error({
message: response.error.message,
code: response.error.code ?? undefined,
status: response.response.status,
});
}

return response.data;
}

/**
Expand All @@ -58,21 +55,15 @@ export class S2AccessTokens {
* @param args.expires_at Expiration in ISO 8601; defaults to requestor's token expiry
*/
public async issue(args: IssueAccessTokenArgs, options?: S2RequestOptions) {
const response = await issueAccessToken({
client: this.client,
body: args,
...options,
return await withRetries(this.retryConfig, async () => {
return await withS2Data(() =>
issueAccessToken({
client: this.client,
body: args,
...options,
}),
);
});

if (response.error) {
throw new S2Error({
message: response.error.message,
code: response.error.code ?? undefined,
status: response.response.status,
});
}

return response.data;
}

/**
Expand All @@ -81,20 +72,14 @@ export class S2AccessTokens {
* @param args.id Token ID to revoke
*/
public async revoke(args: RevokeAccessTokenArgs, options?: S2RequestOptions) {
const response = await revokeAccessToken({
client: this.client,
path: args,
...options,
return await withRetries(this.retryConfig, async () => {
return await withS2Data(() =>
revokeAccessToken({
client: this.client,
path: args,
...options,
}),
);
});

if (response.error) {
throw new S2Error({
message: response.error.message,
code: response.error.code ?? undefined,
status: response.response.status,
});
}

return response.data;
}
}
23 changes: 18 additions & 5 deletions src/basin.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { RetryConfig } from "./common.js";
import { createClient, createConfig } from "./generated/client/index.js";
import type { Client } from "./generated/client/types.gen.js";
import * as Redacted from "./lib/redacted.js";
Expand All @@ -8,6 +9,7 @@ import { S2Streams } from "./streams.js";
export class S2Basin {
private readonly client: Client;
private readonly transportConfig: TransportConfig;
private readonly retryConfig?: RetryConfig;
public readonly name: string;
public readonly streams: S2Streams;

Expand All @@ -19,19 +21,24 @@ export class S2Basin {
* @param accessToken Redacted access token from the parent `S2` client
* @param baseUrl Base URL for the basin (e.g. `https://my-basin.b.aws.s2.dev/v1`)
* @param includeBasinHeader Include the `S2-Basin` header with the request
* @param retryConfig Retry configuration inherited from parent S2 client
*/
constructor(
name: string,
options: {
accessToken: Redacted.Redacted;
baseUrl: string;
includeBasinHeader: boolean;
retryConfig?: RetryConfig;
},
) {
this.name = name;
this.retryConfig = options.retryConfig;
this.transportConfig = {
baseUrl: options.baseUrl,
accessToken: options.accessToken,
basinName: options.includeBasinHeader ? name : undefined,
retry: options.retryConfig,
};
this.client = createClient(
createConfig({
Expand All @@ -40,18 +47,24 @@ export class S2Basin {
headers: options.includeBasinHeader ? { "s2-basin": name } : {},
}),
);
this.streams = new S2Streams(this.client);

this.streams = new S2Streams(this.client, this.retryConfig);
}

/**
* Create a stream-scoped helper bound to `this` basin.
* @param name Stream name
*/
public stream(name: string, options?: StreamOptions) {
return new S2Stream(name, this.client, {
...this.transportConfig,
forceTransport: options?.forceTransport,
});
return new S2Stream(
name,
this.client,
{
...this.transportConfig,
forceTransport: options?.forceTransport,
},
this.retryConfig,
);
}
}

Expand Down
Loading