Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose permission errors at the request api level #296

Merged
merged 2 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nats-base-client/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export class NatsError extends Error {
name: string;
message: string;
code: string;
permissionContext?: { operation: string; subject: string };
chainedError?: Error;
// these are for supporting jetstream
api_error?: ApiError;
Expand Down
2 changes: 1 addition & 1 deletion nats-base-client/jsmsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export class JsMsgImpl implements JsMsg {
if (this.msg.reply) {
const mi = this.msg as MsgImpl;
const proto = mi.publisher as unknown as ProtocolHandler;
const r = new Request(proto.muxSubscriptions);
const r = new Request(proto.muxSubscriptions, this.msg.reply);
proto.request(r);
try {
proto.publish(
Expand Down
27 changes: 27 additions & 0 deletions nats-base-client/muxsubscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,33 @@ export class MuxSubscription {
return null;
}

all(): Request[] {
return Array.from(this.reqs.values());
}

handleError(isMuxPermissionError: boolean, err?: NatsError): boolean {
if (err && err.permissionContext) {
if (isMuxPermissionError) {
// one or more requests queued but mux cannot process them
this.all().forEach((r) => {
r.resolver(err, {} as Msg);
});
return true;
}
const ctx = err.permissionContext;
if (ctx.operation === "publish") {
const req = this.all().find((s) => {
return s.requestSubject === ctx.subject;
});
if (req) {
req.resolver(err, {} as Msg);
return true;
}
}
}
return false;
}

dispatcher() {
return (err: NatsError | null, m: Msg) => {
const token = this.getToken(m);
Expand Down
5 changes: 3 additions & 2 deletions nats-base-client/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ export class NatsConnectionImpl implements NatsConnection {
: createInbox(this.options.inboxPrefix);
const d = deferred<Msg>();
const errCtx = new Error();
this.subscribe(
const sub = this.subscribe(
inbox,
{
max: 1,
Expand All @@ -198,10 +198,11 @@ export class NatsConnectionImpl implements NatsConnection {
},
},
);
(sub as SubscriptionImpl).requestSubject = subject;
this.protocol.publish(subject, data, { reply: inbox });
return d;
} else {
const r = new Request(this.protocol.muxSubscriptions, opts);
const r = new Request(this.protocol.muxSubscriptions, subject, opts);
this.protocol.request(r);

try {
Expand Down
24 changes: 22 additions & 2 deletions nats-base-client/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,15 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
static toError(s: string) {
const t = s ? s.toLowerCase() : "";
if (t.indexOf("permissions violation") !== -1) {
return new NatsError(s, ErrorCode.PermissionsViolation);
const err = new NatsError(s, ErrorCode.PermissionsViolation);
const m = s.match(/(Publish|Subscription) to "(\S+)"/);
if (m) {
err.permissionContext = {
operation: m[1].toLowerCase(),
subject: m[2],
};
}
return err;
} else if (t.indexOf("authorization violation") !== -1) {
return new NatsError(s, ErrorCode.AuthorizationViolation);
} else if (t.indexOf("user authentication expired") !== -1) {
Expand Down Expand Up @@ -421,8 +429,20 @@ export class ProtocolHandler implements Dispatcher<ParserEvent> {
async processError(m: Uint8Array) {
const s = decode(m);
const err = ProtocolHandler.toError(s);
let isMuxPermissionError = false;
const status: Status = { type: Events.Error, data: err.code };
if (err.permissionContext) {
status.permissionContext = err.permissionContext;
const mux = this.subscriptions.getMux();
isMuxPermissionError = mux?.subject === err.permissionContext.subject;
}
this.subscriptions.handleError(err);
this.dispatchStatus({ type: Events.Error, data: err.code });
this.muxSubscriptions.handleError(isMuxPermissionError, err);
if (isMuxPermissionError) {
// remove the permission - enable it to be recreated
this.subscriptions.setMux(null);
}
this.dispatchStatus(status);
await this.handleError(err);
}

Expand Down
3 changes: 3 additions & 0 deletions nats-base-client/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ export class Request {
deferred: Deferred<Msg>;
timer: Timeout<Msg>;
ctx: Error;
requestSubject: string;
private mux: MuxSubscription;

constructor(
mux: MuxSubscription,
requestSubject: string,
opts: RequestOptions = { timeout: 1000 },
) {
this.mux = mux;
this.requestSubject = requestSubject;
this.received = 0;
this.deferred = deferred();
this.token = nuid.next();
Expand Down
1 change: 1 addition & 0 deletions nats-base-client/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
info?: unknown;
cleanupFn?: (sub: Subscription, info?: unknown) => void;
closed: Deferred<void>;
requestSubject?: string;

constructor(
protocol: ProtocolHandler,
Expand Down
43 changes: 23 additions & 20 deletions nats-base-client/subscriptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ import type { NatsError } from "./error.ts";
import type { Msg } from "./types.ts";

export class Subscriptions {
mux!: SubscriptionImpl;
mux: SubscriptionImpl | null;
subs: Map<number, SubscriptionImpl>;
sidCounter: number;

constructor() {
this.sidCounter = 0;
this.mux = null;
this.subs = new Map<number, SubscriptionImpl>();
}

Expand All @@ -37,7 +38,7 @@ export class Subscriptions {
return s;
}

setMux(s: SubscriptionImpl): SubscriptionImpl {
setMux(s: SubscriptionImpl | null): SubscriptionImpl | null {
this.mux = s;
return s;
}
Expand All @@ -59,11 +60,7 @@ export class Subscriptions {
}

all(): (SubscriptionImpl)[] {
const buf = [];
for (const s of this.subs.values()) {
buf.push(s);
}
return buf;
return Array.from(this.subs.values());
}

cancel(s: SubscriptionImpl): void {
Expand All @@ -74,22 +71,28 @@ export class Subscriptions {
}

handleError(err?: NatsError): boolean {
let handled = false;
if (err) {
const re = /^'Permissions Violation for Subscription to "(\S+)"'/i;
const ma = re.exec(err.message);
if (ma) {
const subj = ma[1];
this.subs.forEach((sub) => {
if (subj == sub.subject) {
sub.callback(err, {} as Msg);
sub.close();
handled = sub !== this.mux;
}
if (err && err.permissionContext) {
const ctx = err.permissionContext;
const subs = this.all();
let sub;
if (ctx.operation === "subscription") {
sub = subs.find((s) => {
return s.subject === ctx.subject;
});
}
if (ctx.operation === "publish") {
// we have a no mux subscription
sub = subs.find((s) => {
return s.requestSubject === ctx.subject;
});
}
if (sub) {
sub.callback(err, {} as Msg);
sub.close();
return sub !== this.mux;
}
}
return handled;
return false;
}

close() {
Expand Down
1 change: 1 addition & 0 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export enum Events {
export interface Status {
type: Events | DebugEvents;
data: string | ServersChanged | number;
permissionContext?: { operation: string; subject: string };
}

export enum DebugEvents {
Expand Down
Loading