Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ export class BatchingReceiverLite {
this._createServiceBusMessage = (context: MessageAndDelivery) => {
return new ServiceBusMessageImpl(
_connectionContext,
entityPath,
context.message!,
context.delivery!,
true,
Expand Down
1 change: 0 additions & 1 deletion sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,6 @@ export class ManagementClient extends LinkEntity<RequestResponseLink> {
const decodedMessage = RheaMessageUtil.decode(msg.message);
const message = new ServiceBusMessageImpl(
this._context,
this.entityPath,
decodedMessage as any,
{ tag: msg["lock-token"] } as any,
false,
Expand Down
6 changes: 3 additions & 3 deletions sdk/servicebus/service-bus/src/core/streamingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { receiverLogger as logger } from "../log";
import { AmqpError, EventContext, OnAmqpEvent } from "rhea-promise";
import { ServiceBusMessageImpl } from "../serviceBusMessage";
import { AbortSignalLike } from "@azure/abort-controller";
import { abandonMessage, completeMessage } from "../receivers/shared";

/**
* @internal
Expand Down Expand Up @@ -222,7 +223,6 @@ export class StreamingReceiver extends MessageReceiver {

const bMessage: ServiceBusMessageImpl = new ServiceBusMessageImpl(
this._context,
this.entityPath,
context.message!,
context.delivery!,
true,
Expand Down Expand Up @@ -279,7 +279,7 @@ export class StreamingReceiver extends MessageReceiver {
this.name,
error
);
await bMessage.abandon();
await abandonMessage(bMessage, this._context, entityPath);
} catch (abandonError) {
const translatedError = translate(abandonError);
logger.logError(
Expand Down Expand Up @@ -316,7 +316,7 @@ export class StreamingReceiver extends MessageReceiver {
this.logPrefix,
bMessage.messageId
);
await bMessage.complete();
await completeMessage(bMessage, this._context, entityPath);
} catch (completeError) {
const translatedError = translate(completeError);
logger.logError(
Expand Down
64 changes: 53 additions & 11 deletions sdk/servicebus/service-bus/src/receivers/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { ServiceBusReceivedMessage } from "../serviceBusMessage";
import { ConnectionContext } from "../connectionContext";
import {
getAlreadyReceivingErrorMsg,
getErrorMessageNotSupportedInReceiveAndDeleteMode,
getReceiverClosedErrorMsg,
throwErrorIfConnectionClosed,
throwTypeErrorIfParameterMissing,
Expand All @@ -22,7 +23,15 @@ import {
import { OnError, OnMessage, ReceiveOptions } from "../core/messageReceiver";
import { StreamingReceiverInitArgs, StreamingReceiver } from "../core/streamingReceiver";
import { BatchingReceiver } from "../core/batchingReceiver";
import { assertValidMessageHandlers, getMessageIterator, wrapProcessErrorHandler } from "./shared";
import {
abandonMessage,
assertValidMessageHandlers,
completeMessage,
deadLetterMessage,
deferMessage,
getMessageIterator,
wrapProcessErrorHandler
} from "./shared";
import Long from "long";
import { ServiceBusMessageImpl, DeadLetterOptions } from "../serviceBusMessage";
import { Constants, RetryConfig, RetryOperationType, RetryOptions, retry } from "@azure/core-amqp";
Expand Down Expand Up @@ -611,38 +620,71 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
};
}

completeMessage(message: ServiceBusReceivedMessage): Promise<void> {
async completeMessage(message: ServiceBusReceivedMessage): Promise<void> {
const msgImpl = message as ServiceBusMessageImpl;
return msgImpl.complete();
return completeMessage(msgImpl, this._context, this.entityPath);
}

abandonMessage(
async abandonMessage(
message: ServiceBusReceivedMessage,
propertiesToModify?: { [key: string]: any }
): Promise<void> {
const msgImpl = message as ServiceBusMessageImpl;
return msgImpl.abandon(propertiesToModify);
return abandonMessage(msgImpl, this._context, this.entityPath, propertiesToModify);
}

deferMessage(
async deferMessage(
message: ServiceBusReceivedMessage,
propertiesToModify?: { [key: string]: any }
): Promise<void> {
const msgImpl = message as ServiceBusMessageImpl;
return msgImpl.defer(propertiesToModify);
return deferMessage(msgImpl, this._context, this.entityPath, propertiesToModify);
}

deadLetterMessage(
async deadLetterMessage(
message: ServiceBusReceivedMessage,
options?: DeadLetterOptions & { [key: string]: any }
): Promise<void> {
const msgImpl = message as ServiceBusMessageImpl;
return msgImpl.deadLetter(options);
return deadLetterMessage(msgImpl, this._context, this.entityPath, options);
}

renewMessageLock(message: ServiceBusReceivedMessage): Promise<Date> {
async renewMessageLock(message: ServiceBusReceivedMessage): Promise<Date> {
const msgImpl = message as ServiceBusMessageImpl;
return msgImpl.renewLock();
if (!msgImpl.delivery) {
throw new Error("A peeked message does not have a lock to be renewed.");
}

let associatedLinkName: string | undefined;
let error: Error | undefined;
if (!message.lockToken) {
error = new Error(
getErrorMessageNotSupportedInReceiveAndDeleteMode(`renew the lock on the message`)
);
} else if (msgImpl.delivery.remote_settled) {
error = new Error(`Failed to renew the lock as this message is already settled.`);
}
if (error) {
logger.logError(
error,
"[%s] An error occurred when renewing the lock on the message with id '%s'",
this._context.connectionId,
message.messageId
);
throw error;
}

if (msgImpl.delivery.link) {
const associatedReceiver = this._context.getReceiverFromCache(msgImpl.delivery.link.name);
associatedLinkName = associatedReceiver?.name;
}
return this._context
.getManagementClient(this.entityPath)
.renewLock(message.lockToken!, { associatedLinkName })
.then((lockedUntil) => {
message.lockedUntilUtc = lockedUntil;
return lockedUntil;
});
}

async close(): Promise<void> {
Expand Down
18 changes: 13 additions & 5 deletions sdk/servicebus/service-bus/src/receivers/sessionReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,15 @@ import {
throwTypeErrorIfParameterNotLong
} from "../util/errors";
import { OnError, OnMessage } from "../core/messageReceiver";
import { assertValidMessageHandlers, getMessageIterator, wrapProcessErrorHandler } from "./shared";
import {
abandonMessage,
assertValidMessageHandlers,
completeMessage,
deadLetterMessage,
deferMessage,
getMessageIterator,
wrapProcessErrorHandler
} from "./shared";
import { defaultMaxTimeAfterFirstMessageForBatchingMs, ServiceBusReceiver } from "./receiver";
import Long from "long";
import { ServiceBusMessageImpl, DeadLetterOptions } from "../serviceBusMessage";
Expand Down Expand Up @@ -485,31 +493,31 @@ export class ServiceBusSessionReceiverImpl implements ServiceBusSessionReceiver

async completeMessage(message: ServiceBusReceivedMessage): Promise<void> {
const msgImpl = message as ServiceBusMessageImpl;
return msgImpl.complete();
return completeMessage(msgImpl, this._context, this.entityPath);
}

async abandonMessage(
message: ServiceBusReceivedMessage,
propertiesToModify?: { [key: string]: any }
): Promise<void> {
const msgImpl = message as ServiceBusMessageImpl;
return msgImpl.abandon(propertiesToModify);
return abandonMessage(msgImpl, this._context, this.entityPath, propertiesToModify);
}

async deferMessage(
message: ServiceBusReceivedMessage,
propertiesToModify?: { [key: string]: any }
): Promise<void> {
const msgImpl = message as ServiceBusMessageImpl;
return msgImpl.defer(propertiesToModify);
return deferMessage(msgImpl, this._context, this.entityPath, propertiesToModify);
}

async deadLetterMessage(
message: ServiceBusReceivedMessage,
options?: DeadLetterOptions & { [key: string]: any }
): Promise<void> {
const msgImpl = message as ServiceBusMessageImpl;
return msgImpl.deadLetter(options);
return deadLetterMessage(msgImpl, this._context, this.entityPath, options);
}

async renewMessageLock(): Promise<Date> {
Expand Down
166 changes: 165 additions & 1 deletion sdk/servicebus/service-bus/src/receivers/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,16 @@ import { MessageHandlers, ProcessErrorArgs } from "../models";
import { ServiceBusReceiver } from "./receiver";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { receiverLogger, ServiceBusLogger } from "../log";
import { ServiceBusReceivedMessage } from "../serviceBusMessage";
import {
DeadLetterOptions,
DispositionType,
ServiceBusMessageImpl,
ServiceBusReceivedMessage
} from "../serviceBusMessage";
import { DispositionStatusOptions } from "../core/managementClient";
import { ConnectionContext } from "../connectionContext";
import { getErrorMessageNotSupportedInReceiveAndDeleteMode } from "../util/errors";
import { ErrorNameConditionMapper, translate } from "@azure/core-amqp";

/**
* @internal
Expand Down Expand Up @@ -58,3 +67,158 @@ export function wrapProcessErrorHandler(
}
};
}

export function completeMessage(
message: ServiceBusMessageImpl,
context: ConnectionContext,
entityPath: string
): Promise<void> {
receiverLogger.verbose(
"[%s] Completing the message with id '%s'.",
context.connectionId,
message.messageId
);
return settleMessage(message, DispositionType.complete, context, entityPath);
}

export function abandonMessage(
message: ServiceBusMessageImpl,
context: ConnectionContext,
entityPath: string,
propertiesToModify?: { [key: string]: any }
): Promise<void> {
receiverLogger.verbose(
"[%s] Abandoning the message with id '%s'.",
context.connectionId,
message.messageId
);
return settleMessage(message, DispositionType.abandon, context, entityPath, {
propertiesToModify
});
}

export function deferMessage(
message: ServiceBusMessageImpl,
context: ConnectionContext,
entityPath: string,
propertiesToModify?: { [key: string]: any }
): Promise<void> {
receiverLogger.verbose(
"[%s] Deferring the message with id '%s'.",
context.connectionId,
message.messageId
);
return settleMessage(message, DispositionType.defer, context, entityPath, {
propertiesToModify
});
}

export function deadLetterMessage(
message: ServiceBusMessageImpl,
context: ConnectionContext,
entityPath: string,
propertiesToModify?: DeadLetterOptions & { [key: string]: any }
): Promise<void> {
receiverLogger.verbose(
"[%s] Deadlettering the message with id '%s'.",
context.connectionId,
message.messageId
);

const actualPropertiesToModify: Partial<DeadLetterOptions> = {
...propertiesToModify
};

// these two fields are handled specially and don't need to be in here.
delete actualPropertiesToModify.deadLetterErrorDescription;
delete actualPropertiesToModify.deadLetterReason;

const dispositionStatusOptions: DispositionStatusOptions = {
propertiesToModify: actualPropertiesToModify,
deadLetterReason: propertiesToModify?.deadLetterReason,
deadLetterDescription: propertiesToModify?.deadLetterErrorDescription
};

return settleMessage(
message,
DispositionType.deadletter,
context,
entityPath,
dispositionStatusOptions
);
}

function settleMessage(
message: ServiceBusMessageImpl,
operation: DispositionType,
context: ConnectionContext,
entityPath: string,
options?: DispositionStatusOptions
): Promise<void> {
if (!message.delivery) {
throw new Error("A peeked message cannot be settled.");
}

if (!message.lockToken) {
const error = new Error(
getErrorMessageNotSupportedInReceiveAndDeleteMode(`${operation} the message`)
);
receiverLogger.logError(
error,
"[%s] An error occurred when settling a message with id '%s'",
context.connectionId,
message.messageId
);
throw error;
}
const isDeferredMessage = !message.delivery.link;
const receiver = isDeferredMessage
? undefined
: context.getReceiverFromCache(message.delivery.link.name, message.sessionId);
const associatedLinkName = receiver?.name;

if (!isDeferredMessage) {
// In case the message wasn't from a deferred queue,
// 1. We can verify the remote_settled flag on the delivery
// - If the flag is true, throw an error since the message has been settled (Specifically, with a receive link)
// - If the flag is false, we can't say that the message has not been settled
// since settling with the management link won't update the delivery (In this case, service would throw an error)
// 2. If the message has a session-id and if the associated receiver link is unavailable,
// then throw an error since we need a lock on the session to settle the message.
let error: Error | undefined;
if (message.delivery.remote_settled) {
error = new Error(`Failed to ${operation} the message as this message is already settled.`);
} else if ((!receiver || !receiver.isOpen()) && message.sessionId != undefined) {
error = translate({
description:
`Failed to ${operation} the message as the AMQP link with which the message was ` +
`received is no longer alive.`,
condition: ErrorNameConditionMapper.SessionLockLostError
});
}
if (error) {
receiverLogger.logError(
error,
"[%s] An error occurred when settling a message with id '%s'",
context.connectionId,
message.messageId
);
throw error;
}
}

// Message Settlement with managementLink
// 1. If the received message is deferred as such messages can only be settled using managementLink
// 2. If the associated receiver link is not available. This does not apply to messages from sessions as we need a lock on the session to do so.
if (isDeferredMessage || ((!receiver || !receiver.isOpen()) && message.sessionId == undefined)) {
return context
.getManagementClient(entityPath)
.updateDispositionStatus(message.lockToken, operation, {
...options,
associatedLinkName,
sessionId: message.sessionId
});
}

return receiver!.settleMessage(message, operation, options);
}
Loading