Skip to content

Commit 55150cc

Browse files
authored
[eventhubs-checkpointstore-blob] Updates CheckpointStore methods to support cancellation (Azure#13862)
* [event-hubs] update CheckpointStore interface methods to accept optional OperationOptions * [eventhubs-checkpointstore-blob] update BlobCheckpointStore async methods to support cancellation and passthrough of tracing options * [eventhubs-checkpointstore-blob] update API review file
1 parent da8f04b commit 55150cc

File tree

10 files changed

+322
-31
lines changed

10 files changed

+322
-31
lines changed

sdk/eventhub/event-hubs/CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
# Release History
22

3-
## 5.4.1 (Unreleased)
3+
## 5.5.0 (Unreleased)
44

5+
- Updates the methods on the `CheckpointStore` interface to accept
6+
an optional `options` parameter that can be used to pass in an
7+
`abortSignal` and `tracingOptions`.
58

69
## 5.4.0 (2021-02-09)
710

sdk/eventhub/event-hubs/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@azure/event-hubs",
33
"sdk-type": "client",
4-
"version": "5.4.1",
4+
"version": "5.5.0",
55
"description": "Azure Event Hubs SDK for JS.",
66
"author": "Microsoft Corporation",
77
"license": "MIT",

sdk/eventhub/event-hubs/review/event-hubs.api.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ export interface Checkpoint {
2727

2828
// @public
2929
export interface CheckpointStore {
30-
claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]>;
31-
listCheckpoints(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string): Promise<Checkpoint[]>;
32-
listOwnership(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string): Promise<PartitionOwnership[]>;
33-
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
30+
claimOwnership(partitionOwnership: PartitionOwnership[], options?: OperationOptions): Promise<PartitionOwnership[]>;
31+
listCheckpoints(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string, options?: OperationOptions): Promise<Checkpoint[]>;
32+
listOwnership(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string, options?: OperationOptions): Promise<PartitionOwnership[]>;
33+
updateCheckpoint(checkpoint: Checkpoint, options?: OperationOptions): Promise<void>;
3434
}
3535

3636
// @public

sdk/eventhub/event-hubs/src/eventProcessor.ts

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { CommonEventProcessorOptions } from "./models/private";
1313
import { CloseReason } from "./models/public";
1414
import { ConnectionContext } from "./connectionContext";
1515
import { LoadBalancingStrategy } from "./loadBalancerStrategies/loadBalancingStrategy";
16+
import { OperationOptions } from "./util/operationOptions";
1617

1718
/**
1819
* An interface representing the details on which instance of a `EventProcessor` owns processing
@@ -73,28 +74,41 @@ export interface CheckpointStore {
7374
* <yournamespace>.servicebus.windows.net.
7475
* @param eventHubName - The event hub name.
7576
* @param consumerGroup - The consumer group name.
77+
* @param options - A set of options that can be specified to influence the behavior of this method.
78+
* - `abortSignal`: A signal used to request operation cancellation.
79+
* - `tracingOptions`: Options for configuring tracing.
7680
* @returns A list of partition ownership details of all the partitions that have/had an owner.
7781
*/
7882
listOwnership(
7983
fullyQualifiedNamespace: string,
8084
eventHubName: string,
81-
consumerGroup: string
85+
consumerGroup: string,
86+
options?: OperationOptions
8287
): Promise<PartitionOwnership[]>;
8388
/**
8489
* Called to claim ownership of a list of partitions. This will return the list of partitions that were owned
8590
* successfully.
8691
*
8792
* @param partitionOwnership - The list of partition ownership this instance is claiming to own.
93+
* @param options - A set of options that can be specified to influence the behavior of this method.
94+
* - `abortSignal`: A signal used to request operation cancellation.
95+
* - `tracingOptions`: Options for configuring tracing.
8896
* @returns A list of partitions this instance successfully claimed ownership.
8997
*/
90-
claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]>;
98+
claimOwnership(
99+
partitionOwnership: PartitionOwnership[],
100+
options?: OperationOptions
101+
): Promise<PartitionOwnership[]>;
91102

92103
/**
93104
* Updates the checkpoint in the data store for a partition.
94105
*
95106
* @param checkpoint - The checkpoint.
107+
* @param options - A set of options that can be specified to influence the behavior of this method.
108+
* - `abortSignal`: A signal used to request operation cancellation.
109+
* - `tracingOptions`: Options for configuring tracing.
96110
*/
97-
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
111+
updateCheckpoint(checkpoint: Checkpoint, options?: OperationOptions): Promise<void>;
98112

99113
/**
100114
* Lists all the checkpoints in a data store for a given namespace, eventhub and consumer group.
@@ -103,11 +117,16 @@ export interface CheckpointStore {
103117
* <yournamespace>.servicebus.windows.net.
104118
* @param eventHubName - The event hub name.
105119
* @param consumerGroup - The consumer group name.
120+
* @param options - A set of options that can be specified to influence the behavior of this method.
121+
* - `abortSignal`: A signal used to request operation cancellation.
122+
* - `tracingOptions`: Options for configuring tracing.
123+
* @returns A list of checkpoints for a given namespace, eventhub, and consumer group.
106124
*/
107125
listCheckpoints(
108126
fullyQualifiedNamespace: string,
109127
eventHubName: string,
110-
consumerGroup: string
128+
consumerGroup: string,
129+
options?: OperationOptions
111130
): Promise<Checkpoint[]>;
112131
}
113132

sdk/eventhub/event-hubs/src/util/constants.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@
66
*/
77
export const packageJsonInfo = {
88
name: "@azure/event-hubs",
9-
version: "5.4.1"
9+
version: "5.5.0"
1010
};

sdk/eventhub/eventhubs-checkpointstore-blob/CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
# Release History
22

3-
## 1.0.2 (Unreleased)
3+
## 1.1.0 (Unreleased)
44

5+
- Updates all async methods on `BlobCheckpointStore` to accept
6+
an optional `options` parameter that can be used to pass in an
7+
`abortSignal` and `tracingOptions`.
8+
Resolves issue [#9492](https://github.com/Azure/azure-sdk-for-js/issues/9492).
59

610
## 1.0.1 (2020-08-03)
711

sdk/eventhub/eventhubs-checkpointstore-blob/package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@azure/eventhubs-checkpointstore-blob",
33
"sdk-type": "client",
4-
"version": "1.0.2",
4+
"version": "1.1.0",
55
"description": "An Azure Storage Blob solution to store checkpoints when using Event Hubs.",
66
"author": "Microsoft Corporation",
77
"license": "MIT",
@@ -60,7 +60,8 @@
6060
"docs": "typedoc --excludePrivate --excludeNotExported --excludeExternals --stripInternal --mode file --out ./dist/docs ./src"
6161
},
6262
"dependencies": {
63-
"@azure/event-hubs": "^5.0.0",
63+
"@azure/abort-controller": "^1.0.0",
64+
"@azure/event-hubs": "^5.5.0",
6465
"@azure/logger": "^1.0.0",
6566
"@azure/storage-blob": "^12.4.1",
6667
"events": "^3.0.0",

sdk/eventhub/eventhubs-checkpointstore-blob/review/eventhubs-checkpointstore-blob.api.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,16 @@
77
import { Checkpoint } from '@azure/event-hubs';
88
import { CheckpointStore } from '@azure/event-hubs';
99
import { ContainerClient } from '@azure/storage-blob';
10+
import { OperationOptions } from '@azure/event-hubs';
1011
import { PartitionOwnership } from '@azure/event-hubs';
1112

1213
// @public
1314
export class BlobCheckpointStore implements CheckpointStore {
1415
constructor(containerClient: ContainerClient);
15-
claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]>;
16-
listCheckpoints(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string): Promise<Checkpoint[]>;
17-
listOwnership(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string): Promise<PartitionOwnership[]>;
18-
updateCheckpoint(checkpoint: Checkpoint): Promise<void>;
16+
claimOwnership(partitionOwnership: PartitionOwnership[], options?: OperationOptions): Promise<PartitionOwnership[]>;
17+
listCheckpoints(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string, options?: OperationOptions): Promise<Checkpoint[]>;
18+
listOwnership(fullyQualifiedNamespace: string, eventHubName: string, consumerGroup: string, options?: OperationOptions): Promise<PartitionOwnership[]>;
19+
updateCheckpoint(checkpoint: Checkpoint, options?: OperationOptions): Promise<void>;
1920
}
2021

2122
// @public

sdk/eventhub/eventhubs-checkpointstore-blob/src/blobCheckpointStore.ts

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4-
import { CheckpointStore, PartitionOwnership, Checkpoint } from "@azure/event-hubs";
4+
import {
5+
CheckpointStore,
6+
PartitionOwnership,
7+
Checkpoint,
8+
OperationOptions
9+
} from "@azure/event-hubs";
510
import { ContainerClient, Metadata, RestError, BlobSetMetadataResponse } from "@azure/storage-blob";
611
import { logger, logErrorStackTrace } from "./log";
712
import { throwTypeErrorIfParameterMissing } from "./util/error";
@@ -24,14 +29,19 @@ export class BlobCheckpointStore implements CheckpointStore {
2429
* <yournamespace>.servicebus.windows.net.
2530
* @param eventHubName - The event hub name.
2631
* @param consumerGroup - The consumer group name.
32+
* @param options - A set of options that can be specified to influence the behavior of this method.
33+
* - `abortSignal`: A signal used to request operation cancellation.
34+
* - `tracingOptions`: Options for configuring tracing.
2735
* @returns Partition ownership details of all the partitions that have had an owner.
2836
*/
2937
async listOwnership(
3038
fullyQualifiedNamespace: string,
3139
eventHubName: string,
32-
consumerGroup: string
40+
consumerGroup: string,
41+
options: OperationOptions = {}
3342
): Promise<PartitionOwnership[]> {
3443
const partitionOwnershipArray: PartitionOwnership[] = [];
44+
const { abortSignal, tracingOptions } = options;
3545

3646
const blobPrefix = BlobCheckpointStore.getBlobPrefix({
3747
type: "ownership",
@@ -42,8 +52,10 @@ export class BlobCheckpointStore implements CheckpointStore {
4252

4353
try {
4454
const blobs = this._containerClient.listBlobsFlat({
55+
abortSignal,
4556
includeMetadata: true,
46-
prefix: blobPrefix
57+
prefix: blobPrefix,
58+
tracingOptions
4759
});
4860

4961
for await (const blob of blobs) {
@@ -72,6 +84,9 @@ export class BlobCheckpointStore implements CheckpointStore {
7284
} catch (err) {
7385
logger.warning(`Error occurred while fetching the list of blobs`, err.message);
7486
logErrorStackTrace(err);
87+
88+
if (err?.name === "AbortError") throw err;
89+
7590
throw new Error(`Error occurred while fetching the list of blobs. \n${err}`);
7691
}
7792
}
@@ -81,9 +96,15 @@ export class BlobCheckpointStore implements CheckpointStore {
8196
* successfully claimed.
8297
*
8398
* @param partitionOwnership - The list of partition ownership this instance is claiming to own.
99+
* @param options - A set of options that can be specified to influence the behavior of this method.
100+
* - `abortSignal`: A signal used to request operation cancellation.
101+
* - `tracingOptions`: Options for configuring tracing.
84102
* @returns A list partitions this instance successfully claimed ownership.
85103
*/
86-
async claimOwnership(partitionOwnership: PartitionOwnership[]): Promise<PartitionOwnership[]> {
104+
async claimOwnership(
105+
partitionOwnership: PartitionOwnership[],
106+
options: OperationOptions = {}
107+
): Promise<PartitionOwnership[]> {
87108
const partitionOwnershipArray: PartitionOwnership[] = [];
88109
for (const ownership of partitionOwnership) {
89110
const blobName = BlobCheckpointStore.getBlobPrefix({ type: "ownership", ...ownership });
@@ -93,7 +114,8 @@ export class BlobCheckpointStore implements CheckpointStore {
93114
{
94115
ownerid: ownership.ownerId
95116
},
96-
ownership.etag
117+
ownership.etag,
118+
options
97119
);
98120

99121
if (updatedBlobResponse.lastModified) {
@@ -138,12 +160,17 @@ export class BlobCheckpointStore implements CheckpointStore {
138160
* <yournamespace>.servicebus.windows.net.
139161
* @param eventHubName - The event hub name.
140162
* @param consumerGroup - The consumer group name.
163+
* @param options - A set of options that can be specified to influence the behavior of this method.
164+
* - `abortSignal`: A signal used to request operation cancellation.
165+
* - `tracingOptions`: Options for configuring tracing.
141166
*/
142167
async listCheckpoints(
143168
fullyQualifiedNamespace: string,
144169
eventHubName: string,
145-
consumerGroup: string
170+
consumerGroup: string,
171+
options: OperationOptions = {}
146172
): Promise<Checkpoint[]> {
173+
const { abortSignal, tracingOptions } = options;
147174
const blobPrefix = BlobCheckpointStore.getBlobPrefix({
148175
type: "checkpoint",
149176
fullyQualifiedNamespace,
@@ -152,8 +179,10 @@ export class BlobCheckpointStore implements CheckpointStore {
152179
});
153180

154181
const blobs = this._containerClient.listBlobsFlat({
182+
abortSignal,
155183
includeMetadata: true,
156-
prefix: blobPrefix
184+
prefix: blobPrefix,
185+
tracingOptions
157186
});
158187

159188
const checkpoints: Checkpoint[] = [];
@@ -188,9 +217,12 @@ export class BlobCheckpointStore implements CheckpointStore {
188217
* Updates the checkpoint in the data store for a partition.
189218
*
190219
* @param checkpoint - The checkpoint.
220+
* @param options - A set of options that can be specified to influence the behavior of this method.
221+
* - `abortSignal`: A signal used to request operation cancellation.
222+
* - `tracingOptions`: Options for configuring tracing.
191223
* @returns The new etag on successful update.
192224
*/
193-
async updateCheckpoint(checkpoint: Checkpoint): Promise<void> {
225+
async updateCheckpoint(checkpoint: Checkpoint, options: OperationOptions = {}): Promise<void> {
194226
throwTypeErrorIfParameterMissing(
195227
"updateCheckpoint",
196228
"sequenceNumber",
@@ -206,7 +238,8 @@ export class BlobCheckpointStore implements CheckpointStore {
206238
sequencenumber: checkpoint.sequenceNumber.toString(),
207239
offset: checkpoint.offset.toString()
208240
},
209-
undefined
241+
undefined,
242+
options
210243
);
211244

212245
logger.verbose(
@@ -222,6 +255,9 @@ export class BlobCheckpointStore implements CheckpointStore {
222255
err.message
223256
);
224257
logErrorStackTrace(err);
258+
259+
if (err?.name === "AbortError") throw err;
260+
225261
throw new Error(
226262
`Error occurred while upating the checkpoint for partition: ${checkpoint.partitionId}, ${err}`
227263
);
@@ -251,24 +287,31 @@ export class BlobCheckpointStore implements CheckpointStore {
251287
private async _setBlobMetadata(
252288
blobName: string,
253289
metadata: OwnershipMetadata | CheckpointMetadata,
254-
etag: string | undefined
290+
etag: string | undefined,
291+
options: OperationOptions = {}
255292
): Promise<BlobSetMetadataResponse> {
293+
const { abortSignal, tracingOptions } = options;
256294
const blockBlobClient = this._containerClient.getBlobClient(blobName).getBlockBlobClient();
257295

258296
// When we have an etag, we know the blob existed.
259297
// If we encounter an error we should fail.
260298
if (etag) {
261299
return blockBlobClient.setMetadata(metadata as Metadata, {
300+
abortSignal,
262301
conditions: {
263302
ifMatch: etag
264-
}
303+
},
304+
tracingOptions
265305
});
266306
} else {
267307
try {
268308
// Attempt to set metadata, and fallback to upload if the blob doesn't already exist.
269309
// This avoids poor performance in storage accounts with soft-delete or blob versioning enabled.
270310
// https://github.com/Azure/azure-sdk-for-js/issues/10132
271-
return await blockBlobClient.setMetadata(metadata as Metadata);
311+
return await blockBlobClient.setMetadata(metadata as Metadata, {
312+
abortSignal,
313+
tracingOptions
314+
});
272315
} catch (err) {
273316
// Check if the error is `BlobNotFound` and fallback to `upload` if it is.
274317
if (err?.name !== "RestError") {
@@ -281,7 +324,9 @@ export class BlobCheckpointStore implements CheckpointStore {
281324
}
282325

283326
return blockBlobClient.upload("", 0, {
284-
metadata: metadata as Metadata
327+
abortSignal,
328+
metadata: metadata as Metadata,
329+
tracingOptions
285330
});
286331
}
287332
}

0 commit comments

Comments
 (0)