Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

typed stream update #230

Merged
merged 1 commit into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/jetstream/jsm_readme_jsm_example.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const si = await jsm.streams.info(name);

// update a stream configuration
si.config.subjects?.push("a.b");
await jsm.streams.update(si.config);
await jsm.streams.update(name, si.config);

// get a particular stored message in the stream by sequence
// this is not associated with a consumer
Expand Down
10 changes: 7 additions & 3 deletions nats-base-client/jsmstream_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
StreamInfoRequestOptions,
StreamListResponse,
StreamMsgResponse,
StreamUpdateConfig,
SuccessResponse,
} from "./types.ts";
import { BaseApiClient } from "./jsbaseclient_api.ts";
Expand Down Expand Up @@ -61,10 +62,13 @@ export class StreamAPIImpl extends BaseApiClient implements StreamAPI {
return cr.success;
}

async update(cfg = {} as StreamConfig): Promise<StreamInfo> {
validateStreamName(cfg.name);
async update(
name: string,
cfg = {} as StreamUpdateConfig,
): Promise<StreamInfo> {
validateStreamName(name);
const r = await this._request(
`${this.prefix}.STREAM.UPDATE.${cfg.name}`,
`${this.prefix}.STREAM.UPDATE.${name}`,
cfg,
);
const si = r as StreamInfo;
Expand Down
27 changes: 15 additions & 12 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ export type StreamInfoRequestOptions = {
export interface StreamAPI {
info(stream: string, opts?: StreamInfoRequestOptions): Promise<StreamInfo>;
add(cfg: Partial<StreamConfig>): Promise<StreamInfo>;
update(cfg: StreamConfig): Promise<StreamInfo>;
update(name: string, cfg: StreamUpdateConfig): Promise<StreamInfo>;
purge(stream: string, opts?: PurgeOpts): Promise<PurgeResponse>;
delete(stream: string): Promise<boolean>;
list(): Lister<StreamInfo>;
Expand Down Expand Up @@ -536,29 +536,32 @@ export interface StreamInfo {
sources?: StreamSourceInfo[];
}

export interface StreamConfig {
export interface StreamConfig extends StreamUpdateConfig {
name: string;
description?: string;
subjects?: string[];
retention: RetentionPolicy;
storage: StorageType;
"num_replicas": number;
"template_owner"?: string;
placement?: Placement;
mirror?: StreamSource; // same as a source
sealed: boolean;
"deny_delete": boolean;
"deny_purge": boolean;
}

export interface StreamUpdateConfig {
description?: string;
"max_consumers": number;
"max_msgs_per_subject"?: number;
"max_msgs": number;
"max_bytes": number;
"max_age": Nanos;
"max_bytes": number;
"max_msg_size"?: number;
storage: StorageType;
discard?: DiscardPolicy;
"num_replicas": number;
"no_ack"?: boolean;
"template_owner"?: string;
"duplicate_window"?: Nanos;
placement?: Placement;
mirror?: StreamSource; // same as a source
sources?: StreamSource[];
sealed: boolean;
"deny_delete": boolean;
"deny_purge": boolean;
"allow_rollup_hdrs": boolean;
}

Expand Down
2 changes: 1 addition & 1 deletion tests/jetstream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2346,7 +2346,7 @@ Deno.test("jetstream - seal", async () => {
await jsm.streams.deleteMessage(stream, 1);

si.config.sealed = true;
const usi = await jsm.streams.update(si.config);
const usi = await jsm.streams.update(stream, si.config);
assertEquals(usi.config.sealed, true);

await assertThrowsAsync(
Expand Down
10 changes: 5 additions & 5 deletions tests/jsm_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ Deno.test("jsm - empty stream config update fails", async () => {

await assertThrowsAsync(
async () => {
await jsm.streams.update({} as StreamConfig);
await jsm.streams.update("", {} as StreamConfig);
},
Error,
StreamNameRequired,
);
ci!.config!.subjects!.push("foo");
ci = await jsm.streams.update(ci.config);
ci = await jsm.streams.update(name, ci.config);
assertEquals(ci!.config!.subjects!.length, 2);
await cleanup(ns, nc);
});
Expand Down Expand Up @@ -631,7 +631,7 @@ Deno.test("jsm - update stream", async () => {
assertEquals(si.config!.subjects!.length, 1);

si.config!.subjects!.push("foo");
si = await jsm.streams.update(si.config);
si = await jsm.streams.update(stream, si.config);
assertEquals(si.config!.subjects!.length, 2);
await cleanup(ns, nc);
});
Expand Down Expand Up @@ -804,9 +804,9 @@ Deno.test("jsm - cross account streams", async () => {
assertEquals(si.state.messages, 0);

// update
const config = streams[0].config;
const config = streams[0].config as StreamConfig;
config.subjects!.push(`${stream}.B`);
si = await jsm.streams.update(config);
si = await jsm.streams.update(config.name, config);
assertEquals(si.config.subjects!.length, 2);

// find
Expand Down