Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix queued channel failure logic #436

Merged
merged 7 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/channel/guaranteedChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export class GuaranteedChannel<M, S> implements OutputChannel<M> {
() => {
if (this.closed) {
// Cancel delay immediately when the channel is closed
abort(MessageDeliveryError.nonRetryable('Connection deliberately closed.'));
abort(MessageDeliveryError.retryable('Connection deliberately closed.'));
}
},
0,
Expand Down
57 changes: 42 additions & 15 deletions src/channel/queuedChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,10 @@ export class QueuedChannel<T> implements OutputChannel<T> {
}

if (this.pending === undefined) {
this.pending = this.queue.isEmpty()
? Promise.resolve()
: Promise.reject(MessageDeliveryError.retryable('The queue must be flushed.'));
this.pending = this.requeue();
}

this.enqueue(message);

this.pending = this.pending.then(
() => this.channel
.publish(message)
.then(this.dequeue.bind(this)),
);
this.pending = this.chainNext(this.pending, message, true);

return this.pending;
}
Expand Down Expand Up @@ -86,16 +78,51 @@ export class QueuedChannel<T> implements OutputChannel<T> {
this.logger.debug(`Queue length: ${length}`);

for (const message of this.queue.all()) {
this.pending = this.pending.then(
() => this.channel
.publish(message)
.then(this.dequeue.bind(this)),
);
this.pending = this.chainNext(this.pending, message);
}

return this.pending;
}

private async chainNext(previous: Promise<void>, message: T, enqueue = false): Promise<void> {
marcospassos marked this conversation as resolved.
Show resolved Hide resolved
marcospassos marked this conversation as resolved.
Show resolved Hide resolved
if (enqueue) {
this.enqueue(message);
}

try {
await previous;
} catch (error) {
if (error instanceof MessageDeliveryError && error.retryable) {
// If the previous message failed to deliver, requeue all messages
// including the current one that was just enqueued
return this.requeue();
}

throw error;
}

try {
const result = await this.channel.publish(message);

this.dequeue();

return result;
} catch (error) {
if (!(error instanceof MessageDeliveryError) || !error.retryable) {
// Discard the message if it's non-retryable
this.dequeue();

if (!enqueue) {
// If the message was not enqueued, suppress the error
// so that the next message in the queue can be immediately
return;
}
}

throw error;
}
}

public async close(): Promise<void> {
this.closed = true;

Expand Down
4 changes: 2 additions & 2 deletions src/channel/retryChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class RetryChannel<T> implements OutputChannel<T> {

while (this.retryPolicy.shouldRetry(attempt, message, error)) {
if (this.closed) {
throw MessageDeliveryError.nonRetryable('Connection deliberately closed.');
throw MessageDeliveryError.retryable('Connection deliberately closed.');
}

const delay = this.retryPolicy.getDelay(attempt);
Expand All @@ -59,7 +59,7 @@ export class RetryChannel<T> implements OutputChannel<T> {
// Cancel delay immediately when the channel is closed
window.clearInterval(closeWatcher);

reject(MessageDeliveryError.nonRetryable('Connection deliberately closed.'));
reject(MessageDeliveryError.retryable('Connection deliberately closed.'));
}
},
0,
Expand Down
2 changes: 1 addition & 1 deletion test/channel/guaranteedChannel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ describe('A guaranteed channel', () => {
await channel.close();

await expect(promise).rejects.toThrowWithMessage(MessageDeliveryError, 'Connection deliberately closed.');
await expect(promise).rejects.toHaveProperty('retryable', false);
await expect(promise).rejects.toHaveProperty('retryable', true);
});

it('should close the output channel on close', async () => {
Expand Down
181 changes: 163 additions & 18 deletions test/channel/queuedChannel.test.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,43 @@
import {InMemoryQueue, CapacityRestrictedQueue} from '../../src/queue';
import {QueuedChannel, OutputChannel, MessageDeliveryError} from '../../src/channel';
import {Logger} from '../../src/logging';

describe('A queued channel', () => {
afterEach(() => {
jest.restoreAllMocks();
});

it('should resume flushing from the last failed message', async () => {
it('should resume flushing from the oldest message', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn()
.mockResolvedValueOnce(undefined)
.mockRejectedValueOnce(new MessageDeliveryError('Rejected', true))
.mockResolvedValue(undefined),
};
const channel = new QueuedChannel(outputChannel, new InMemoryQueue('foo', 'bar'));
const queue = new InMemoryQueue('foo', 'bar');
const channel = new QueuedChannel(outputChannel, queue);

await expect(channel.flush()).rejects.toThrowWithMessage(MessageDeliveryError, 'Rejected');

await expect(channel.flush()).rejects.toEqual(expect.any(Error));
expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo');
expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar');

expect(outputChannel.publish).toHaveBeenCalledTimes(2);
expect(queue.isEmpty()).toBe(false);
expect(queue.peek()).toBe('bar');

await channel.flush();

expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'bar');
expect(outputChannel.publish).toHaveBeenCalledTimes(3);
expect(queue.isEmpty()).toBe(true);

await channel.flush();
await expect(channel.publish('baz')).resolves.toBeUndefined();

expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'bar');
expect(outputChannel.publish).toHaveBeenNthCalledWith(4, 'baz');

expect(outputChannel.publish).toHaveBeenCalledTimes(3);
expect(outputChannel.publish).toHaveBeenCalledTimes(4);
expect(queue.isEmpty()).toBe(true);
});

it('should do nothing when flushing an empty queue', async () => {
Expand Down Expand Up @@ -123,30 +132,30 @@ describe('A queued channel', () => {
await expect(promise).rejects.toHaveProperty('retryable', false);
});

it('should fail to publish messages if queue has pending messages', async () => {
it('should automatically requeue messages on the first publish', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn().mockResolvedValue(undefined),
};
const channel = new QueuedChannel(outputChannel, new InMemoryQueue('foo'));

const promise = channel.publish('bar');

await expect(promise).rejects.toThrowWithMessage(MessageDeliveryError, 'The queue must be flushed.');
await expect(promise).rejects.toHaveProperty('retryable', true);
const queue = new InMemoryQueue('foo', 'bar');
const channel = new QueuedChannel(outputChannel, queue);

await channel.flush();
await expect(channel.publish('baz')).resolves.toBeUndefined();

expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo');
expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar');
expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'baz');

await channel.publish('baz');
expect(queue.isEmpty()).toBe(true);

expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'baz');
expect(outputChannel.publish).toHaveBeenCalledTimes(3);
await channel.publish('qux');

expect(outputChannel.publish).toHaveBeenNthCalledWith(4, 'qux');
expect(outputChannel.publish).toHaveBeenCalledTimes(4);
});

it('should publish messages if queue has no pending messages', async () => {
it('should publish messages immediately if queue has no pending messages', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn().mockResolvedValue(undefined),
Expand All @@ -166,6 +175,142 @@ describe('A queued channel', () => {
expect(outputChannel.publish).toHaveBeenCalledTimes(2);
});

it('should require a flush after a failed, non-retryable message', async () => {
const logger: Logger = {
debug: jest.fn(),
info: jest.fn(),
warn: jest.fn(),
error: jest.fn(),
};

const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn()
.mockImplementationOnce(
() => new Promise((_, reject) => {
setTimeout(() => reject(new Error('Failed')), 1);
}),
)
.mockResolvedValue(undefined),
};

const channel = new QueuedChannel(outputChannel, new InMemoryQueue(), logger);

await expect(channel.publish('foo')).rejects.toEqual(expect.any(Error));
await expect(channel.publish('bar')).rejects.toEqual(expect.any(Error));

expect(outputChannel.publish).toHaveBeenCalledTimes(1);
expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo');

await channel.flush();

expect(outputChannel.publish).toHaveBeenCalledTimes(2);
expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar');
});

it('should not require flush for processing re-enqueued messages', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn()
.mockRejectedValueOnce(new Error('Failed'))
.mockRejectedValueOnce(new Error('Failed'))
.mockResolvedValue(undefined),
};

const queue = new InMemoryQueue('foo', 'bar');
const channel = new QueuedChannel(outputChannel, queue);

await expect(channel.publish('baz')).resolves.toBeUndefined();

expect(outputChannel.publish).toHaveBeenCalledTimes(3);
expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo');
expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar');
expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'baz');

expect(queue.isEmpty()).toBe(true);
});

it('should flush all non-retryable messages even if an error occurs', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn()
.mockRejectedValueOnce(new Error('Failed'))
.mockRejectedValueOnce(new Error('Failed')),
};

const queue = new InMemoryQueue('foo', 'bar');
const channel = new QueuedChannel(outputChannel, queue);

await expect(channel.flush()).resolves.toBeUndefined();

expect(outputChannel.publish).toHaveBeenCalledTimes(2);
expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo');
expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar');

expect(queue.isEmpty()).toBe(true);
});

it('should not dequeue messages if an retryable error occurs', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn()
.mockResolvedValueOnce(undefined)
.mockRejectedValueOnce(new MessageDeliveryError('Rejected', true))
.mockResolvedValueOnce(undefined),
};

const queue = new InMemoryQueue('foo');
const channel = new QueuedChannel(outputChannel, queue);

const promise = channel.publish('bar');

await expect(promise).rejects.toEqual(expect.any(MessageDeliveryError));

expect(outputChannel.publish).toHaveBeenCalledTimes(2);
expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo');
expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar');

expect(queue.length()).toBe(1);
expect(queue.peek()).toBe('bar');

await expect(channel.flush()).resolves.toBeUndefined();

expect(outputChannel.publish).toHaveBeenCalledTimes(3);
expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'bar');

expect(queue.isEmpty()).toBe(true);
});

it('should requeue messages if a retryable error occurs', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
publish: jest.fn()
.mockResolvedValueOnce(undefined)
.mockRejectedValueOnce(new MessageDeliveryError('Rejected', true))
.mockResolvedValue(undefined),
};

const queue = new InMemoryQueue('foo');
const channel = new QueuedChannel(outputChannel, queue);

await expect(channel.publish('bar')).rejects.toEqual(expect.any(MessageDeliveryError));

expect(outputChannel.publish).toHaveBeenCalledTimes(2);
expect(outputChannel.publish).toHaveBeenNthCalledWith(1, 'foo');
expect(outputChannel.publish).toHaveBeenNthCalledWith(2, 'bar');

expect(queue.length()).toBe(1);
expect(queue.peek()).toBe('bar');

await expect(channel.publish('baz')).resolves.toBeUndefined();

expect(outputChannel.publish).toHaveBeenCalledTimes(4);
expect(outputChannel.publish).toHaveBeenNthCalledWith(3, 'bar');
expect(outputChannel.publish).toHaveBeenNthCalledWith(4, 'baz');

expect(queue.isEmpty()).toBe(true);
});

it('should close the output channel and wait for pending messages', async () => {
const outputChannel: OutputChannel<string> = {
close: jest.fn().mockResolvedValue(undefined),
Expand Down
4 changes: 2 additions & 2 deletions test/channel/retryChannel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ describe('A retry channel', () => {
await channel.close();

await expect(promise).rejects.toThrowWithMessage(MessageDeliveryError, 'Connection deliberately closed.');
await expect(promise).rejects.toHaveProperty('retryable', false);
await expect(promise).rejects.toHaveProperty('retryable', true);
});

it('should fail to publish a message if the channel is closed while retrying', async () => {
Expand All @@ -96,7 +96,7 @@ describe('A retry channel', () => {
await channel.close();

await expect(promise).rejects.toThrowWithMessage(MessageDeliveryError, 'Connection deliberately closed.');
await expect(promise).rejects.toHaveProperty('retryable', false);
await expect(promise).rejects.toHaveProperty('retryable', true);
});

it('should fail to publish a message if maximum retry attempts is reached', async () => {
Expand Down