diff --git a/sdk/eventhub/event-processor-host/samples/iothubEph.ts b/sdk/eventhub/event-processor-host/samples/iothubEph.ts index 5d31f5bced56..5e0e1246c00f 100644 --- a/sdk/eventhub/event-processor-host/samples/iothubEph.ts +++ b/sdk/eventhub/event-processor-host/samples/iothubEph.ts @@ -33,7 +33,7 @@ async function main(): Promise { await stopEph(eph); } -main().catch(err => { +main().catch((err) => { console.log("Exiting from main() due to an error: %O.", err); }); @@ -58,7 +58,9 @@ async function startEph(ephName: string): Promise { // Message handler const partionCount: { [x: string]: number } = {}; const onMessage: OnReceivedMessage = async (context: PartitionContext, event: EventData) => { - !partionCount[context.partitionId] ? (partionCount[context.partitionId] = 1) : partionCount[context.partitionId]++; + !partionCount[context.partitionId] + ? (partionCount[context.partitionId] = 1) + : partionCount[context.partitionId]++; console.log( "[%s] %d - Received message from partition: '%s', offset: '%s'", ephName, @@ -75,7 +77,11 @@ async function startEph(ephName: string): Promise { eph.receivingFromPartitions ); await context.checkpointFromEventData(event); - console.log("[%s] Successfully checkpointed message number %d", ephName, partionCount[context.partitionId]); + console.log( + "[%s] Successfully checkpointed message number %d", + ephName, + partionCount[context.partitionId] + ); } catch (err) { console.log( "[%s] An error occurred while checkpointing msg number %d: %O", diff --git a/sdk/eventhub/event-processor-host/samples/multiEph.ts b/sdk/eventhub/event-processor-host/samples/multiEph.ts index 9816d0d0fb5d..404bef9060c3 100644 --- a/sdk/eventhub/event-processor-host/samples/multiEph.ts +++ b/sdk/eventhub/event-processor-host/samples/multiEph.ts @@ -46,7 +46,7 @@ async function main(): Promise { await stopEph(eph2); } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); @@ -75,7 +75,9 @@ async function startEph(ephName: string): Promise { // Message handler const partionCount: { [x: string]: number } = {}; const onMessage: OnReceivedMessage = async (context: PartitionContext, event: EventData) => { - !partionCount[context.partitionId] ? (partionCount[context.partitionId] = 1) : partionCount[context.partitionId]++; + !partionCount[context.partitionId] + ? (partionCount[context.partitionId] = 1) + : partionCount[context.partitionId]++; console.log( "[%s] %d - Received message from partition: '%s', offset: '%s'", ephName, @@ -92,7 +94,11 @@ async function startEph(ephName: string): Promise { eph.receivingFromPartitions ); await context.checkpointFromEventData(event); - console.log("[%s] Successfully checkpointed message number %d", ephName, partionCount[context.partitionId]); + console.log( + "[%s] Successfully checkpointed message number %d", + ephName, + partionCount[context.partitionId] + ); } catch (err) { console.log( "[%s] An error occurred while checkpointing msg number %d: %O", diff --git a/sdk/eventhub/event-processor-host/samples/sendBatch.ts b/sdk/eventhub/event-processor-host/samples/sendBatch.ts index e07ef48a5081..c5868efcc7e2 100644 --- a/sdk/eventhub/event-processor-host/samples/sendBatch.ts +++ b/sdk/eventhub/event-processor-host/samples/sendBatch.ts @@ -31,12 +31,12 @@ async function main(): Promise { console.log("Sending batch events..."); // Will concurrently send batched events to all the partitions. - const sendPromises = partitionIds.map(id => client.sendBatch(events, id)); + const sendPromises = partitionIds.map((id) => client.sendBatch(events, id)); await Promise.all(sendPromises); await client.close(); } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); diff --git a/sdk/eventhub/event-processor-host/samples/singleEph.ts b/sdk/eventhub/event-processor-host/samples/singleEph.ts index 880738375ff2..5a493934d481 100644 --- a/sdk/eventhub/event-processor-host/samples/singleEph.ts +++ b/sdk/eventhub/event-processor-host/samples/singleEph.ts @@ -42,7 +42,7 @@ async function main(): Promise { await stopEph(eph); } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); @@ -68,7 +68,9 @@ async function startEph(ephName: string): Promise { // Message handler const partionCount: { [x: string]: number } = {}; const onMessage: OnReceivedMessage = async (context: PartitionContext, event: EventData) => { - !partionCount[context.partitionId] ? (partionCount[context.partitionId] = 1) : partionCount[context.partitionId]++; + !partionCount[context.partitionId] + ? (partionCount[context.partitionId] = 1) + : partionCount[context.partitionId]++; console.log( "[%s] %d - Received message from partition: '%s', offset: '%s'", ephName, @@ -85,7 +87,11 @@ async function startEph(ephName: string): Promise { eph.receivingFromPartitions ); await context.checkpointFromEventData(event); - console.log("[%s] Successfully checkpointed message number %d", ephName, partionCount[context.partitionId]); + console.log( + "[%s] Successfully checkpointed message number %d", + ephName, + partionCount[context.partitionId] + ); } catch (err) { console.log( "[%s] An error occurred while checkpointing msg number %d: %O", diff --git a/sdk/eventhub/event-processor-host/samples/tsconfig.json b/sdk/eventhub/event-processor-host/samples/tsconfig.json index c400c95a8623..82e45c21672e 100644 --- a/sdk/eventhub/event-processor-host/samples/tsconfig.json +++ b/sdk/eventhub/event-processor-host/samples/tsconfig.json @@ -3,11 +3,6 @@ "compilerOptions": { "module": "commonjs" }, - "include": [ - "**/*.ts" - ], - "exclude": [ - "../node_modules", - "../typings/**", - ] + "include": ["**/*.ts"], + "exclude": ["../node_modules", "../typings/**"] } diff --git a/sdk/eventhub/event-processor-host/src/azureBlob.ts b/sdk/eventhub/event-processor-host/src/azureBlob.ts index 124b70f43365..3cf62dec121b 100644 --- a/sdk/eventhub/event-processor-host/src/azureBlob.ts +++ b/sdk/eventhub/event-processor-host/src/azureBlob.ts @@ -81,8 +81,16 @@ export class AzureBlob { return this._blobService.getContent(this._containerName, this._blobPath, options); } - changeLease(currentLeaseId: string, proposedLeaseId: string): Promise { - return this._blobService.changeLease(this._containerName, this._blobPath, currentLeaseId, proposedLeaseId); + changeLease( + currentLeaseId: string, + proposedLeaseId: string + ): Promise { + return this._blobService.changeLease( + this._containerName, + this._blobPath, + currentLeaseId, + proposedLeaseId + ); } getBlobProperties(): Promise { @@ -97,7 +105,12 @@ export class AzureBlob { metadata: Dictionary, options?: StorageBlobService.BlobRequestOptions ): Promise { - return this._blobService.setBlobMetadata(this._containerName, this._blobPath, metadata, options); + return this._blobService.setBlobMetadata( + this._containerName, + this._blobPath, + metadata, + options + ); } listBlobsSegmented( @@ -106,7 +119,9 @@ export class AzureBlob { return this._blobService.listBlobsSegmented(this._containerName, options); } - acquireLease(options: StorageBlobService.AcquireLeaseRequestOptions): Promise { + acquireLease( + options: StorageBlobService.AcquireLeaseRequestOptions + ): Promise { return this._blobService.acquireLease(this._containerName, this._blobPath, options); } diff --git a/sdk/eventhub/event-processor-host/src/azureStorageCheckpointLeaseManager.ts b/sdk/eventhub/event-processor-host/src/azureStorageCheckpointLeaseManager.ts index 92c2645e8d6d..1b6589218252 100644 --- a/sdk/eventhub/event-processor-host/src/azureStorageCheckpointLeaseManager.ts +++ b/sdk/eventhub/event-processor-host/src/azureStorageCheckpointLeaseManager.ts @@ -107,7 +107,9 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le await Promise.all(deleteBlobs); await blobService.deleteContainerIfExists(storageContainerName); } else { - throw new Error("'blobService' is not defined in the 'hostContext', hence cannot " + "list all the blobs."); + throw new Error( + "'blobService' is not defined in the 'hostContext', hence cannot " + "list all the blobs." + ); } } catch (err) { const msg = @@ -189,7 +191,9 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le const leaseBlobs = await this._listBlobs(); if (leaseBlobs.length === partitionIds.length) { log.checkpointLeaseMgr( - withHost("Number of blobs %d === Number of partitionIds %d. " + "Hence no need to create leases."), + withHost( + "Number of blobs %d === Number of partitionIds %d. " + "Hence no need to create leases." + ), leaseBlobs.length, partitionIds.length ); @@ -235,7 +239,10 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le returnLease = await this.getLease(partitionId); } else { log.error( - withHostAndPartition(partitionId, "An error occurred while creating lease if " + "it does not exist: %O."), + withHostAndPartition( + partitionId, + "An error occurred while creating lease if " + "it does not exist: %O." + ), error ); throw error; @@ -277,7 +284,10 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le return false; } log.checkpointLeaseMgr( - withHostAndPartition(lease, "Need to change lease '%s' -> '%s' " + "for partitionId '%s'."), + withHostAndPartition( + lease, + "Need to change lease '%s' -> '%s' " + "for partitionId '%s'." + ), lease.token, newLeaseId, lease.partitionId @@ -364,7 +374,8 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le log.checkpointLeaseMgr( withHostAndPartition( lease, - "Let us renew the lease to make sure the " + "update with offset '%s' and sequence number %d will go through." + "Let us renew the lease to make sure the " + + "update with offset '%s' and sequence number %d will go through." ), lease.offset, lease.sequenceNumber @@ -463,7 +474,9 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le log.checkpointLeaseMgr(withHost("Number of blobs: %d"), listResult.entries.length); return listResult.entries; } else { - throw new Error("'blobService' is not defined in the 'hostContext', hence cannot " + "list all the blobs."); + throw new Error( + "'blobService' is not defined in the 'hostContext', hence cannot " + "list all the blobs." + ); } } @@ -521,7 +534,10 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le options.metadata[metadataOwnerName] = lease.owner || this._context.hostName; } log.checkpointLeaseMgr( - withHostAndPartition(lease, "Trying to upload raw JSON for activity " + "'%s': %s, with options: %o"), + withHostAndPartition( + lease, + "Trying to upload raw JSON for activity " + "'%s': %s, with options: %o" + ), activity, jsonToUpload, options @@ -546,7 +562,11 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le result = true; } } - log.error(withHostAndPartition(partitionId, "Was lease lost -> %s, err: %O."), result, getStorageError(err)); + log.error( + withHostAndPartition(partitionId, "Was lease lost -> %s, err: %O."), + result, + getStorageError(err) + ); return result; } } diff --git a/sdk/eventhub/event-processor-host/src/blobService.ts b/sdk/eventhub/event-processor-host/src/blobService.ts index 14586dcf510b..6ac3b1eb0b16 100644 --- a/sdk/eventhub/event-processor-host/src/blobService.ts +++ b/sdk/eventhub/event-processor-host/src/blobService.ts @@ -2,7 +2,11 @@ // Licensed under the MIT License. import { Dictionary } from "@azure/event-hubs"; -import { createBlobService, BlobService as StorageBlobService, ServiceResponse } from "azure-storage"; +import { + createBlobService, + BlobService as StorageBlobService, + ServiceResponse +} from "azure-storage"; import * as log from "./log"; import { validateType, getStorageError } from "./util/utils"; import { defaultMaximumExecutionTimeInMs } from "./util/constants"; @@ -89,22 +93,34 @@ export class BlobService { validateType("containerName", containerName, true, "string"); return new Promise((resolve, reject) => { - log.blobService("[%s] Ensuring that the container '%s' exists.", this._hostName, containerName); - this._storageBlobService.createContainerIfNotExists(containerName, (error, result, response) => { - if (error) { - log.error( - "[%s] An error occurred while ensuring that the container '%s' exists: %O", - this._hostName, - containerName, - getStorageError(error) - ); - reject(error); - } else { - const containerInfo = { created: result, details: response }; - log.blobService("[%s] Result for Container '%s': %O", this._hostName, containerName, containerInfo); - resolve(containerInfo); + log.blobService( + "[%s] Ensuring that the container '%s' exists.", + this._hostName, + containerName + ); + this._storageBlobService.createContainerIfNotExists( + containerName, + (error, result, response) => { + if (error) { + log.error( + "[%s] An error occurred while ensuring that the container '%s' exists: %O", + this._hostName, + containerName, + getStorageError(error) + ); + reject(error); + } else { + const containerInfo = { created: result, details: response }; + log.blobService( + "[%s] Result for Container '%s': %O", + this._hostName, + containerName, + containerInfo + ); + resolve(containerInfo); + } } - }); + ); }); } @@ -122,7 +138,12 @@ export class BlobService { ); reject(error); } else { - log.blobService("[%s] Does container '%s' exist -> %s.", this._hostName, containerName, result.exists); + log.blobService( + "[%s] Does container '%s' exist -> %s.", + this._hostName, + containerName, + result.exists + ); resolve(result.exists); } }); @@ -138,7 +159,8 @@ export class BlobService { this._storageBlobService.doesBlobExist(containerName, blobPath, (error, result) => { if (error) { log.error( - "[%s] [%s] An error occurred while determining whether the blob '%s' exists in " + "container '%s': %O", + "[%s] [%s] An error occurred while determining whether the blob '%s' exists in " + + "container '%s': %O", this._hostName, partitionId, blobPath, @@ -180,26 +202,33 @@ export class BlobService { blobPath, containerName ); - this._storageBlobService.createBlockBlobFromText(containerName, blobPath, text, options, error => { - if (error) { - if ((error as any).statusCode === 412) { - // Blob already exists. - resolve(); + this._storageBlobService.createBlockBlobFromText( + containerName, + blobPath, + text, + options, + (error) => { + if (error) { + if ((error as any).statusCode === 412) { + // Blob already exists. + resolve(); + } else { + log.error( + "[%s] [%s] An error occurred while ensuring that blob '%s' exists in " + + "container '%s': %O", + this._hostName, + partitionId, + blobPath, + containerName, + getStorageError(error) + ); + reject(error); + } } else { - log.error( - "[%s] [%s] An error occurred while ensuring that blob '%s' exists in " + "container '%s': %O", - this._hostName, - partitionId, - blobPath, - containerName, - getStorageError(error) - ); - reject(error); + resolve(); } - } else { - resolve(); } - }); + ); }); } @@ -224,28 +253,34 @@ export class BlobService { leaseId, blobPath ); - this._storageBlobService.renewLease(containerName, blobPath, leaseId, options, (error, result) => { - if (error) { - log.error( - "[%s] [%s] An error occurred while renewing lease '%s' for blobPath '%s': %O.", - this._hostName, - partitionId, - leaseId, - blobPath, - getStorageError(error) - ); - reject(error); - } else { - log.blobService( - "[%s] [%s] Successfully, renewed lease with leaseId: '%s' for blobPath '%s'.", - this._hostName, - partitionId, - leaseId, - blobPath - ); - resolve(result); + this._storageBlobService.renewLease( + containerName, + blobPath, + leaseId, + options, + (error, result) => { + if (error) { + log.error( + "[%s] [%s] An error occurred while renewing lease '%s' for blobPath '%s': %O.", + this._hostName, + partitionId, + leaseId, + blobPath, + getStorageError(error) + ); + reject(error); + } else { + log.blobService( + "[%s] [%s] Successfully, renewed lease with leaseId: '%s' for blobPath '%s'.", + this._hostName, + partitionId, + leaseId, + blobPath + ); + resolve(result); + } } - }); + ); }); } @@ -270,28 +305,34 @@ export class BlobService { leaseId, blobPath ); - this._storageBlobService.releaseLease(containerName, blobPath, leaseId, options, (error, result) => { - if (error) { - log.error( - "[%s] [%s] An error occurred while releasing lease '%s' for blobPath '%s': %O.", - this._hostName, - partitionId, - leaseId, - blobPath, - getStorageError(error) - ); - reject(error); - } else { - log.blobService( - "[%s] [%s] Successfully, released lease with leaseId: '%s' for blobPath '%s'.", - this._hostName, - partitionId, - leaseId, - blobPath - ); - resolve(result); + this._storageBlobService.releaseLease( + containerName, + blobPath, + leaseId, + options, + (error, result) => { + if (error) { + log.error( + "[%s] [%s] An error occurred while releasing lease '%s' for blobPath '%s': %O.", + this._hostName, + partitionId, + leaseId, + blobPath, + getStorageError(error) + ); + reject(error); + } else { + log.blobService( + "[%s] [%s] Successfully, released lease with leaseId: '%s' for blobPath '%s'.", + this._hostName, + partitionId, + leaseId, + blobPath + ); + resolve(result); + } } - }); + ); }); } @@ -317,28 +358,34 @@ export class BlobService { containerName, blobPath ); - this._storageBlobService.createBlockBlobFromText(containerName, blobPath, text, options, (error, result) => { - if (error) { - log.error( - "[%s] [%s] An error occurred while updating content '%s' to blobPath '%s': %O.", - this._hostName, - partitionId, - text, - blobPath, - getStorageError(error) - ); - reject(error); - } else { - log.blobService( - "[%s] [%s] Successfully, updated blob content '%s' for blobPath '%s'.", - this._hostName, - partitionId, - text, - blobPath - ); - resolve(result); + this._storageBlobService.createBlockBlobFromText( + containerName, + blobPath, + text, + options, + (error, result) => { + if (error) { + log.error( + "[%s] [%s] An error occurred while updating content '%s' to blobPath '%s': %O.", + this._hostName, + partitionId, + text, + blobPath, + getStorageError(error) + ); + reject(error); + } else { + log.blobService( + "[%s] [%s] Successfully, updated blob content '%s' for blobPath '%s'.", + this._hostName, + partitionId, + text, + blobPath + ); + resolve(result); + } } - }); + ); }); } @@ -354,28 +401,38 @@ export class BlobService { return new Promise((resolve, reject) => { if (!options) options = {}; - log.blobService("[%s] [%s] Attempting to getcontent from blobPath '%s'.", this._hostName, partitionId, blobPath); - this._storageBlobService.getBlobToText(containerName, blobPath, options, (error, text, result) => { - if (error) { - log.error( - "[%s] [%s] An error occurred while getting content from blobPath '%s': %O.", - this._hostName, - partitionId, - blobPath, - getStorageError(error) - ); - reject(error); - } else { - log.blobService( - "[%s] [%s] Successfully, fetched blob content '%s' for blobPath '%s'.", - this._hostName, - partitionId, - text, - blobPath - ); - resolve(text); + log.blobService( + "[%s] [%s] Attempting to getcontent from blobPath '%s'.", + this._hostName, + partitionId, + blobPath + ); + this._storageBlobService.getBlobToText( + containerName, + blobPath, + options, + (error, text, result) => { + if (error) { + log.error( + "[%s] [%s] An error occurred while getting content from blobPath '%s': %O.", + this._hostName, + partitionId, + blobPath, + getStorageError(error) + ); + reject(error); + } else { + log.blobService( + "[%s] [%s] Successfully, fetched blob content '%s' for blobPath '%s'.", + this._hostName, + partitionId, + text, + blobPath + ); + resolve(text); + } } - }); + ); }); } @@ -408,7 +465,8 @@ export class BlobService { (error, result) => { if (error) { log.error( - "[%s] [%s] An error occurred while changing lease '%s' to '%s' for blobPath " + "'%s': %O.", + "[%s] [%s] An error occurred while changing lease '%s' to '%s' for blobPath " + + "'%s': %O.", this._hostName, partitionId, currentLeaseId, @@ -419,7 +477,8 @@ export class BlobService { reject(error); } else { log.blobService( - "[%s] [%s] Successfully, changed current lease '%s' with proposed lease " + "'%s' for blobPath '%s'.", + "[%s] [%s] Successfully, changed current lease '%s' with proposed lease " + + "'%s' for blobPath '%s'.", this._hostName, partitionId, currentLeaseId, @@ -433,7 +492,10 @@ export class BlobService { }); } - getBlobProperties(containerName: string, blobPath: string): Promise { + getBlobProperties( + containerName: string, + blobPath: string + ): Promise { validateType("containerName", containerName, true, "string"); validateType("blobPath", blobPath, true, "string"); const partitionId = path.basename(blobPath); @@ -480,25 +542,34 @@ export class BlobService { }; } return new Promise((resolve, reject) => { - log.blobService("[%s] Attempting to list blobs for container '%s'.", this._hostName, containerName); - this._storageBlobService.listBlobsSegmented(containerName, undefined as any, options!, (error, result) => { - if (error) { - log.error( - "[%s] An error occurred while listing blobs for container '%s': %O.", - this._hostName, - containerName, - getStorageError(error) - ); - reject(error); - } else { - log.blobService( - "[%s] Successfully, received the list of blobs for container '%s'.", - this._hostName, - containerName - ); - resolve(result); + log.blobService( + "[%s] Attempting to list blobs for container '%s'.", + this._hostName, + containerName + ); + this._storageBlobService.listBlobsSegmented( + containerName, + undefined as any, + options!, + (error, result) => { + if (error) { + log.error( + "[%s] An error occurred while listing blobs for container '%s': %O.", + this._hostName, + containerName, + getStorageError(error) + ); + reject(error); + } else { + log.blobService( + "[%s] Successfully, received the list of blobs for container '%s'.", + this._hostName, + containerName + ); + resolve(result); + } } - }); + ); }); } @@ -559,30 +630,36 @@ export class BlobService { metadata, blobPath ); - this._storageBlobService.setBlobMetadata(containerName, blobPath, metadata, options!, (error, result) => { - if (error) { - log.error( - "[%s] [%s] An error occurred while setting blob metadata for blobPath '%s': %O.", - this._hostName, - partitionId, - blobPath, - getStorageError(error) - ); - reject(error); - } else { - log.blobService( - "[%s] [%s] Successfully, set the blob metadata for blobPath '%s'. " + - "The result is: name: %s, metadata: %o, lease: %o", - this._hostName, - partitionId, - blobPath, - result.name, - result.metadata, - result.lease - ); - resolve(result); + this._storageBlobService.setBlobMetadata( + containerName, + blobPath, + metadata, + options!, + (error, result) => { + if (error) { + log.error( + "[%s] [%s] An error occurred while setting blob metadata for blobPath '%s': %O.", + this._hostName, + partitionId, + blobPath, + getStorageError(error) + ); + reject(error); + } else { + log.blobService( + "[%s] [%s] Successfully, set the blob metadata for blobPath '%s'. " + + "The result is: name: %s, metadata: %o, lease: %o", + this._hostName, + partitionId, + blobPath, + result.name, + result.metadata, + result.lease + ); + resolve(result); + } } - }); + ); }); } @@ -634,7 +711,12 @@ export class BlobService { const partitionId = path.basename(blobPath); return new Promise((resolve, reject) => { - log.blobService("[%s] Attempting to delete blob for blobPath '%s'.", this._hostName, partitionId, blobPath); + log.blobService( + "[%s] Attempting to delete blob for blobPath '%s'.", + this._hostName, + partitionId, + blobPath + ); this._storageBlobService.deleteBlobIfExists(containerName, blobPath, (error, result) => { if (error) { log.error( @@ -675,7 +757,12 @@ export class BlobService { ); reject(error); } else { - log.blobService("[%s] Deleted container '%s' -> %s.", this._hostName, containerName, result); + log.blobService( + "[%s] Deleted container '%s' -> %s.", + this._hostName, + containerName, + result + ); resolve(); } }); diff --git a/sdk/eventhub/event-processor-host/src/checkpointInfo.ts b/sdk/eventhub/event-processor-host/src/checkpointInfo.ts index abff8b9a565b..b54ab6fadf73 100644 --- a/sdk/eventhub/event-processor-host/src/checkpointInfo.ts +++ b/sdk/eventhub/event-processor-host/src/checkpointInfo.ts @@ -37,7 +37,11 @@ export namespace CheckpointInfo { * @param {number} [sequenceNumber] The sequence number of the event to be checked in. * @return {CheckpointInfo} CheckpointInfo */ - export function create(partitionId: string, offset?: string, sequenceNumber?: number): CheckpointInfo { + export function create( + partitionId: string, + offset?: string, + sequenceNumber?: number + ): CheckpointInfo { validateType("partitionId", partitionId, true, "string"); validateType("offset", offset, false, "string"); validateType("sequenceNumber", sequenceNumber, false, "number"); diff --git a/sdk/eventhub/event-processor-host/src/eventProcessorHost.ts b/sdk/eventhub/event-processor-host/src/eventProcessorHost.ts index be8fb2943481..64f59adb64a6 100644 --- a/sdk/eventhub/event-processor-host/src/eventProcessorHost.ts +++ b/sdk/eventhub/event-processor-host/src/eventProcessorHost.ts @@ -84,7 +84,9 @@ export class EventProcessorHost { * * @returns {EventHubPartitionRuntimeInformation} EventHubPartitionRuntimeInformation */ - getPartitionInformation(partitionId: string | number): Promise { + getPartitionInformation( + partitionId: string | number + ): Promise { return this._context.getPartitionInformation(partitionId); } @@ -372,7 +374,11 @@ export class EventProcessorHost { storageContainerName: string, namespace: string, eventHubPath: string, - credentials: ApplicationTokenCredentials | UserTokenCredentials | DeviceTokenCredentials | MSITokenCredentials, + credentials: + | ApplicationTokenCredentials + | UserTokenCredentials + | DeviceTokenCredentials + | MSITokenCredentials, options?: FromTokenProviderOptions ): EventProcessorHost { if (!options) options = {}; @@ -425,7 +431,11 @@ export class EventProcessorHost { hostName: string, namespace: string, eventHubPath: string, - credentials: ApplicationTokenCredentials | UserTokenCredentials | DeviceTokenCredentials | MSITokenCredentials, + credentials: + | ApplicationTokenCredentials + | UserTokenCredentials + | DeviceTokenCredentials + | MSITokenCredentials, checkpointManager: CheckpointManager, leaseManager: LeaseManager, options?: FromTokenProviderOptions diff --git a/sdk/eventhub/event-processor-host/src/hostContext.ts b/sdk/eventhub/event-processor-host/src/hostContext.ts index 7903cbfcae10..9ae5c0a7f9cf 100644 --- a/sdk/eventhub/event-processor-host/src/hostContext.ts +++ b/sdk/eventhub/event-processor-host/src/hostContext.ts @@ -26,7 +26,12 @@ import { validateType } from "./util/utils"; import { PartitionContext } from "./partitionContext"; import { BaseLease } from "./baseLease"; import { PartitionPump } from "./partitionPump"; -import { EventProcessorHostOptions, OnEphError, OnReceivedMessage, OnReceivedError } from "./modelTypes"; +import { + EventProcessorHostOptions, + OnEphError, + OnReceivedMessage, + OnReceivedError +} from "./modelTypes"; import { maxLeaseDurationInSeconds, minLeaseDurationInSeconds, @@ -81,7 +86,9 @@ export interface HostContextWithCheckpointLeaseManager extends BaseHostContext { checkpointManager: CheckpointManager; getEventHubClient(): EventHubClient; getHubRuntimeInformation(): Promise; - getPartitionInformation(partitionId: string | number): Promise; + getPartitionInformation( + partitionId: string | number + ): Promise; getPartitionIds(): Promise; } @@ -103,13 +110,15 @@ export namespace HostContext { validateType("leaseRenewInterval", interval, true, "number"); if (duration <= interval) { - throw new Error(`Lease duration ${duration} needs to be greater than lease ` + `renew interval ${interval}.`); + throw new Error( + `Lease duration ${duration} needs to be greater than lease ` + `renew interval ${interval}.` + ); } if (duration > maxLeaseDurationInSeconds || duration < minLeaseDurationInSeconds) { throw new Error( `Lease duration needs to be between ${minLeaseDurationInSeconds} ` + - `seconds and ${maxLeaseDurationInSeconds} seconds. The given value is: ${duration} seconds.` + `seconds and ${maxLeaseDurationInSeconds} seconds. The given value is: ${duration} seconds.` ); } } @@ -118,12 +127,14 @@ export namespace HostContext { if (!name || name.match(/^[a-z0-9](([a-z0-9\-[^\-])){1,61}[a-z0-9]$/gi) === null) { throw new Error( `Azure Storage lease container name "${name}" is invalid. Please check ` + - `naming conventions at https://msdn.microsoft.com/en-us/library/azure/dd135715.aspx` + `naming conventions at https://msdn.microsoft.com/en-us/library/azure/dd135715.aspx` ); } } - function _eitherStorageConnectionStringOrCheckpointLeaseManager(options: EventProcessorHostOptions): void { + function _eitherStorageConnectionStringOrCheckpointLeaseManager( + options: EventProcessorHostOptions + ): void { validateType("options", options, true, "object"); const checkpointManager = options.checkpointManager; const leaseManager = options.leaseManager; @@ -131,12 +142,14 @@ export namespace HostContext { if (storageConnectionString) { if (checkpointManager || leaseManager) { throw new Error( - "Either provide ('checkpointManager' and 'leaseManager') or " + "provide 'storageConnectionString'." + "Either provide ('checkpointManager' and 'leaseManager') or " + + "provide 'storageConnectionString'." ); } } else if (!(checkpointManager && leaseManager)) { throw new Error( - "Either provide ('checkpointManager' and 'leaseManager') or " + "provide 'storageConnectionString'." + "Either provide ('checkpointManager' and 'leaseManager') or " + + "provide 'storageConnectionString'." ); } } @@ -148,10 +161,15 @@ export namespace HostContext { const leaseRenewInterval = options.leaseRenewInterval; if (leaseManager) { if (leaseDuration || leaseRenewInterval) { - throw new Error("Either provide ('leaseDuration' and 'leaseRenewInterval') or " + "provide 'leaseManager'."); + throw new Error( + "Either provide ('leaseDuration' and 'leaseRenewInterval') or " + + "provide 'leaseManager'." + ); } } else if (!(leaseDuration && leaseRenewInterval)) { - throw new Error("Either provide ('leaseDuration' and 'leaseRenewInterval') or " + "provide 'leaseManager'."); + throw new Error( + "Either provide ('leaseDuration' and 'leaseRenewInterval') or " + "provide 'leaseManager'." + ); } } @@ -161,12 +179,16 @@ export namespace HostContext { const onEphErrorFunc: OnEphError = () => { // do nothing }; - const config = EventHubConnectionConfig.create(options.eventHubConnectionString!, options.eventHubPath); + const config = EventHubConnectionConfig.create( + options.eventHubConnectionString!, + options.eventHubPath + ); // set defaults if (!options.consumerGroup) options.consumerGroup = defaultConsumerGroup; if (!options.eventHubPath) options.eventHubPath = config.entityPath; - if (!options.leaseRenewInterval) options.leaseRenewInterval = defaultLeaseRenewIntervalInSeconds; + if (!options.leaseRenewInterval) + options.leaseRenewInterval = defaultLeaseRenewIntervalInSeconds; if (!options.leaseDuration) options.leaseDuration = defaultLeaseDurationInSeconds; if (!options.onEphError) options.onEphError = onEphErrorFunc; if (!options.dataTransformer) options.dataTransformer = new DefaultDataTransformer(); @@ -176,8 +198,18 @@ export namespace HostContext { validateType("options", options, true, "object"); validateType("options.eventHubPath", options.eventHubPath, true, "string"); - validateType("options.eventHubConnectionString", options.eventHubConnectionString, true, "string"); - validateType("options.storageConnectionString", options.storageConnectionString, false, "string"); + validateType( + "options.eventHubConnectionString", + options.eventHubConnectionString, + true, + "string" + ); + validateType( + "options.storageConnectionString", + options.storageConnectionString, + false, + "string" + ); validateType("options.initialOffset", options.initialOffset, false, "object"); validateType("options.consumerGroup", options.consumerGroup, false, "string"); validateType("options.storageContainerName", options.storageContainerName, false, "string"); @@ -254,9 +286,13 @@ export namespace HostContext { { userAgent: ctxt.userAgent } ); } else { - return EventHubClient.createFromConnectionString(ctxt.eventHubConnectionString, ctxt.eventHubPath, { - userAgent: ctxt.userAgent - }); + return EventHubClient.createFromConnectionString( + ctxt.eventHubConnectionString, + ctxt.eventHubPath, + { + userAgent: ctxt.userAgent + } + ); } }; ctxt.getHubRuntimeInformation = async () => { @@ -289,8 +325,14 @@ export namespace HostContext { return ctxt; } - function _createWithPumpManager(hostName: string, options: EventProcessorHostOptions): HostContextWithPumpManager { - const context = _createWithCheckpointLeaseManager(hostName, options) as HostContextWithPumpManager; + function _createWithPumpManager( + hostName: string, + options: EventProcessorHostOptions + ): HostContextWithPumpManager { + const context = _createWithCheckpointLeaseManager( + hostName, + options + ) as HostContextWithPumpManager; context.pumpManager = new PumpManager(context); return context; } @@ -299,9 +341,9 @@ export namespace HostContext { * @property {string} userAgent The user agent string for the EventHubs client. * See guideline at https://github.com/Azure/azure-sdk/blob/master/docs/design/Telemetry.mdk */ - const userAgent: string = `azsdk-js-azureeventprocessorhost/${packageInfo.version} (NODE-VERSION ${ - process.version - }; ${os.type()} ${os.release()})`; + const userAgent: string = `azsdk-js-azureeventprocessorhost/${ + packageInfo.version + } (NODE-VERSION ${process.version}; ${os.type()} ${os.release()})`; /** * @ignore diff --git a/sdk/eventhub/event-processor-host/src/modelTypes.ts b/sdk/eventhub/event-processor-host/src/modelTypes.ts index 736ebc9b1d6f..c2b80989a666 100644 --- a/sdk/eventhub/event-processor-host/src/modelTypes.ts +++ b/sdk/eventhub/event-processor-host/src/modelTypes.ts @@ -2,7 +2,13 @@ // Licensed under the MIT License. import { PartitionContext } from "./partitionContext"; -import { EventData, MessagingError, EventPosition, TokenProvider, ClientOptionsBase } from "@azure/event-hubs"; +import { + EventData, + MessagingError, + EventPosition, + TokenProvider, + ClientOptionsBase +} from "@azure/event-hubs"; import { CheckpointManager } from "./checkpointManager"; import { LeaseManager } from "./leaseManager"; diff --git a/sdk/eventhub/event-processor-host/src/partitionContext.ts b/sdk/eventhub/event-processor-host/src/partitionContext.ts index afc99cf5f892..0312c04a0563 100644 --- a/sdk/eventhub/event-processor-host/src/partitionContext.ts +++ b/sdk/eventhub/event-processor-host/src/partitionContext.ts @@ -54,7 +54,11 @@ export class PartitionContext { * @param {string} owner The name of the owner. * @param {CompleteLease} lease The lease object. */ - constructor(context: HostContextWithCheckpointLeaseManager, partitionId: string, lease: CompleteLease) { + constructor( + context: HostContextWithCheckpointLeaseManager, + partitionId: string, + lease: CompleteLease + ) { this._context = context; this.partitionId = partitionId; this.lease = lease; @@ -117,7 +121,9 @@ export class PartitionContext { * @ignore */ async getInitialOffset(): Promise { - const startingCheckpoint = await this._context.checkpointManager.getCheckpoint(this.partitionId); + const startingCheckpoint = await this._context.checkpointManager.getCheckpoint( + this.partitionId + ); const withHostAndPartiton = this._context.withHostAndPartition; let result: EventPosition; if (!startingCheckpoint) { @@ -130,7 +136,8 @@ export class PartitionContext { result = this._context.initialOffset || EventPosition.fromOffset(this._offset); } else { if (startingCheckpoint.offset != undefined) this._offset = startingCheckpoint.offset; - if (startingCheckpoint.sequenceNumber != undefined) this._sequenceNumber = startingCheckpoint.sequenceNumber; + if (startingCheckpoint.sequenceNumber != undefined) + this._sequenceNumber = startingCheckpoint.sequenceNumber; result = EventPosition.fromOffset(this._offset); log.partitionContext( withHostAndPartiton(this, "Retrieved starting offset/sequence " + "number: %s/%d"), @@ -139,7 +146,10 @@ export class PartitionContext { ); } log.partitionContext( - withHostAndPartiton(this, "Initial position provider offset: %s, " + "sequenceNumber: %d, enqueuedTime: %d"), + withHostAndPartiton( + this, + "Initial position provider offset: %s, " + "sequenceNumber: %d, enqueuedTime: %d" + ), result.offset, result.sequenceNumber, result.enqueuedTime @@ -153,7 +163,9 @@ export class PartitionContext { private async _persistCheckpoint(checkpoint: CheckpointInfo): Promise { const withHostAndPartiton = this._context.withHostAndPartition; try { - const inStoreCheckpoint = await this._context.checkpointManager.getCheckpoint(checkpoint.partitionId); + const inStoreCheckpoint = await this._context.checkpointManager.getCheckpoint( + checkpoint.partitionId + ); if (inStoreCheckpoint && inStoreCheckpoint.sequenceNumber >= checkpoint.sequenceNumber) { const msg = `Ignoring out of date checkpoint with offset: '${checkpoint.offset}', ` + @@ -165,7 +177,10 @@ export class PartitionContext { } log.partitionContext(withHostAndPartiton(this, "Persisting the checkpoint: %O."), checkpoint); await this._context.checkpointManager.updateCheckpoint(this.lease, checkpoint); - log.partitionContext(withHostAndPartiton(this, "Successfully persisted the checkpoint: %O."), checkpoint); + log.partitionContext( + withHostAndPartiton(this, "Successfully persisted the checkpoint: %O."), + checkpoint + ); } catch (err) { const msg = `An error occurred while checkpointing info for partition ` + diff --git a/sdk/eventhub/event-processor-host/src/partitionManager.ts b/sdk/eventhub/event-processor-host/src/partitionManager.ts index ad4f70442649..a7c47fe29367 100644 --- a/sdk/eventhub/event-processor-host/src/partitionManager.ts +++ b/sdk/eventhub/event-processor-host/src/partitionManager.ts @@ -58,7 +58,9 @@ export class PartitionManager { try { await localRunTask; } catch (err) { - const msg = `An error occurred while stopping the run task: ` + `${err ? err.stack : JSON.stringify(err)}.`; + const msg = + `An error occurred while stopping the run task: ` + + `${err ? err.stack : JSON.stringify(err)}.`; log.error(withHost("%s"), msg); } finally { this._isRunning = false; @@ -72,7 +74,9 @@ export class PartitionManager { shouldStop(): boolean { if (this._isCancelRequested) { log.partitionManager( - this._context.withHost("Cancellation was requested -> %s. " + "Hence stopping further execution."), + this._context.withHost( + "Cancellation was requested -> %s. " + "Hence stopping further execution." + ), this._isCancelRequested ); } @@ -117,7 +121,8 @@ export class PartitionManager { await this._context.pumpManager.removeAllPumps(CloseReason.shutdown); } catch (err) { const msg = - `An error occurred while shutting down the partition ` + `manager: ${err ? err.stack : JSON.stringify(err)}.`; + `An error occurred while shutting down the partition ` + + `manager: ${err ? err.stack : JSON.stringify(err)}.`; log.error(withHost("%s"), msg); this._context.onEphError({ hostName: this._context.hostName, @@ -206,7 +211,8 @@ export class PartitionManager { log.partitionManager(withHost("Ensure that the checkpoint exists.")); const checkpointConfig: RetryConfig = { hostName: hostName, - operation: () => checkpointManager.createAllCheckpointsIfNotExists(this._context.partitionIds), + operation: () => + checkpointManager.createAllCheckpointsIfNotExists(this._context.partitionIds), retryMessage: "Failure creating checkpoint for partition, retrying", finalFailureMessage: "Out of retries for creating checkpoint for partition", action: EPHActionStrings.creatingCheckpoint, @@ -228,12 +234,17 @@ export class PartitionManager { } const didSteal = await this._partitionScanner.scan(isFirst); log.partitionManager(withHost("Did we steal any leases in this scan: %s."), didSteal); - let seconds: number = didSteal ? this._context.fastScanInterval! : this._context.slowScanInterval!; + let seconds: number = didSteal + ? this._context.fastScanInterval! + : this._context.slowScanInterval!; if (isFirst) { seconds = this._context.startupScanDelay!; isFirst = false; } - log.partitionManager(withHost("Sleeping for %d seconds before starting the next scan."), seconds); + log.partitionManager( + withHost("Sleeping for %d seconds before starting the next scan."), + seconds + ); await delay(seconds * 1000); } } diff --git a/sdk/eventhub/event-processor-host/src/partitionPump.ts b/sdk/eventhub/event-processor-host/src/partitionPump.ts index a1c397b03da9..2c822998264e 100644 --- a/sdk/eventhub/event-processor-host/src/partitionPump.ts +++ b/sdk/eventhub/event-processor-host/src/partitionPump.ts @@ -69,7 +69,10 @@ export class PartitionPump { await this._createNewReceiver(); await this._scheduleLeaseRenewer(); log.partitionPump( - withHostAndPartition(this._lease, "Successfully started the receiver and scheduled lease renewer.") + withHostAndPartition( + this._lease, + "Successfully started the receiver and scheduled lease renewer." + ) ); } @@ -84,7 +87,10 @@ export class PartitionPump { this._client = this._context.getEventHubClient(); } catch (err) { log.error( - withHostAndPartition(partitionId, "An error occurred while creating " + "the eventhub client: %O."), + withHostAndPartition( + partitionId, + "An error occurred while creating " + "the eventhub client: %O." + ), err ); throw err; @@ -156,7 +162,10 @@ export class PartitionPump { let result: boolean = true; let error: Error | undefined; log.partitionPump( - withHostAndPartition(this._lease, "Lease renewer is active after " + "%d seconds. Trying to renew the lease"), + withHostAndPartition( + this._lease, + "Lease renewer is active after " + "%d seconds. Trying to renew the lease" + ), this._context.leaseRenewInterval ); try { @@ -179,7 +188,10 @@ export class PartitionPump { } if (!result) { log.error( - withHostAndPartition(this._lease, "Failed to renew the lease, result: %s. " + "Shutting down the receiver."), + withHostAndPartition( + this._lease, + "Failed to renew the lease, result: %s. " + "Shutting down the receiver." + ), result ); await this._removeReceiver(CloseReason.leaseLost); @@ -199,7 +211,10 @@ export class PartitionPump { try { await this._leaseRenewer(); } catch (err) { - log.error(withHostAndPartition(this._lease, "An error occurred in the _leaseRenewer(): %O"), err); + log.error( + withHostAndPartition(this._lease, "An error occurred in the _leaseRenewer(): %O"), + err + ); } }, renewalTime); } @@ -272,7 +287,9 @@ export class PartitionPump { let result = false; if (error) { // condition is "amqp:link:stolen" - if ((error as MessagingError).condition === ErrorNameConditionMapper.ReceiverDisconnectedError) { + if ( + (error as MessagingError).condition === ErrorNameConditionMapper.ReceiverDisconnectedError + ) { result = true; } else if (error.message.match(/.*New receiver with higher epoch.*/i) !== null) { result = true; diff --git a/sdk/eventhub/event-processor-host/src/partitionScanner.ts b/sdk/eventhub/event-processor-host/src/partitionScanner.ts index 1a20a6f0a088..fa0ae456b926 100644 --- a/sdk/eventhub/event-processor-host/src/partitionScanner.ts +++ b/sdk/eventhub/event-processor-host/src/partitionScanner.ts @@ -46,13 +46,21 @@ export class PartitionScanner { const stealThese = await this._findLeasesToSteal(remainingNeeded); log.partitionScanner(withHost("Number of lease found to steal: %d."), stealThese.length); didSteal = await this._stealLeases(stealThese); - log.partitionScanner(withHost("Have succesfully stolen: %d leases -> %s."), stealThese.length, didSteal); + log.partitionScanner( + withHost("Have succesfully stolen: %d leases -> %s."), + stealThese.length, + didSteal + ); } else { - log.partitionScanner(withHost("No need to scan further since remaining needed: %d."), remainingNeeded); + log.partitionScanner( + withHost("No need to scan further since remaining needed: %d."), + remainingNeeded + ); } } catch (err) { didSteal = false; - const msg = `An error occurred while scanning leases: ` + `${err ? err.stack : JSON.stringify(err)}.`; + const msg = + `An error occurred while scanning leases: ` + `${err ? err.stack : JSON.stringify(err)}.`; log.error(withHost("%s"), hostName, msg); const info: EPHDiagnosticInfo = { action: EPHActionStrings.scanningLeases, @@ -85,7 +93,10 @@ export class PartitionScanner { private _sortLeasesAndCalculateDesiredCount(isFirst: boolean): number { const hostName: string = this._context.hostName; const withHost = this._context.withHost; - log.partitionScanner(withHost("Accounting input: allLeaseStates count is: %d"), this._allLeaseStates.length); + log.partitionScanner( + withHost("Accounting input: allLeaseStates count is: %d"), + this._allLeaseStates.length + ); const uniqueOwners: Set = new Set(); uniqueOwners.add(hostName); let ourLeasesCount = 0; @@ -133,7 +144,11 @@ export class PartitionScanner { startingPoint = countPerHost * hostOrdinal; } // rotate this._allLeaseStates - log.partitionScanner(withHost("Host ordinal: %d. Rotating leases to start at: %d."), hostOrdinal, startingPoint); + log.partitionScanner( + withHost("Host ordinal: %d. Rotating leases to start at: %d."), + hostOrdinal, + startingPoint + ); if (startingPoint !== 0) { const rotatedList: Array = []; for (let i = 0; i < this._allLeaseStates.length; i++) { @@ -183,7 +198,12 @@ export class PartitionScanner { if (startAt < this._allLeaseStates.length) { const lease = this._allLeaseStates[startAt]; const partitionId = lease ? lease.partitionId : "undefined"; - log.partitionScanner(withHost("Examining chunk at '%s': [%d], needed %d."), partitionId, startAt, needed); + log.partitionScanner( + withHost("Examining chunk at '%s': [%d], needed %d."), + partitionId, + startAt, + needed + ); } else { log.partitionScanner(withHost("Examining chunk skipping, startAt is off end: %d"), startAt); } @@ -191,7 +211,11 @@ export class PartitionScanner { if (needed > 0 && this._unownedCount > 0 && startAt < this._allLeaseStates.length) { let runningNeeded = needed; const endAt = Math.min(startAt + needed, this._allLeaseStates.length); - log.partitionScanner(withHost("Finding expired leases from inclusive position range %d - %d"), startAt, endAt); + log.partitionScanner( + withHost("Finding expired leases from inclusive position range %d - %d"), + startAt, + endAt + ); const getThese: BaseLease[] = this._findExpiredLeases(startAt, endAt); const leaseManager = this._context.leaseManager; const getTheseResult: Promise[] = []; @@ -199,7 +223,7 @@ export class PartitionScanner { let lease: CompleteLease | undefined = undefined; const getThisPromise = leaseManager .getLease(thisLease.partitionId) - .then(receivedLease => { + .then((receivedLease) => { lease = receivedLease; if (lease) { return leaseManager.acquireLease(lease); @@ -207,10 +231,12 @@ export class PartitionScanner { return false; } }) - .then(acquired => { + .then((acquired) => { if (acquired) { runningNeeded--; - log.partitionScanner(withHostAndPartition(thisLease, "Acquired unowned/expired lease.")); + log.partitionScanner( + withHostAndPartition(thisLease, "Acquired unowned/expired lease.") + ); if (this._leaseOwnedByOthers.has(lease!.partitionId)) { this._leaseOwnedByOthers.delete(lease!.partitionId); this._unownedCount--; @@ -221,7 +247,7 @@ export class PartitionScanner { } return Promise.resolve(); }) - .catch(err => { + .catch((err) => { const msg = `An error occurred while getting/acquiring lease for partitionId ` + `'${thisLease.partitionId}': ${err ? err.stack : JSON.stringify(err)}`; @@ -239,7 +265,7 @@ export class PartitionScanner { return resultPromise.then(() => { return Promise.all(getTheseResult) - .catch(err => { + .catch((err) => { const msg = `An error occurred while getting/acquiring leases for some partitionId: ` + `${err ? err.stack : JSON.stringify(err)}`; @@ -319,7 +345,7 @@ export class PartitionScanner { let lease: CompleteLease | undefined = undefined; const tryStealPromise: Promise = this._context.leaseManager .getLease(stealableLease.partitionId) - .then(receivedLease => { + .then((receivedLease) => { lease = receivedLease; if (receivedLease) { return this._context.leaseManager.acquireLease(receivedLease); @@ -327,14 +353,16 @@ export class PartitionScanner { return false; } }) - .then(acquired => { + .then((acquired) => { if (acquired) { this._context.pumpManager.addPump(lease!).catch(); - log.partitionScanner(withHostAndPartition(stealableLease, "Successfully stolen the lease.")); + log.partitionScanner( + withHostAndPartition(stealableLease, "Successfully stolen the lease.") + ); } return acquired; }) - .catch(err => { + .catch((err) => { const msg = `An error occurred while stealing the lease for partitionId ` + `'${stealableLease.partitionId}': ${err ? err.stack : JSON.stringify(err)}`; @@ -355,7 +383,7 @@ export class PartitionScanner { // If we found at least one case where the lease could not be stolen then `.some()` // returns true. The final result will be true if `.some()` was not able to find a single // lease that could not be stolen. - const result = !stealResult.some(x => { + const result = !stealResult.some((x) => { return !x; }); return result; diff --git a/sdk/eventhub/event-processor-host/src/pumpManager.ts b/sdk/eventhub/event-processor-host/src/pumpManager.ts index d3edf8847745..5dc384fd8e10 100644 --- a/sdk/eventhub/event-processor-host/src/pumpManager.ts +++ b/sdk/eventhub/event-processor-host/src/pumpManager.ts @@ -31,15 +31,26 @@ export class PumpManager { await this.removePump(partitionId, CloseReason.shutdown); } else { log.pumpManager( - withHostAndPartition(partitionId, "Updating lease for pump since it" + "is open -> %s."), + withHostAndPartition( + partitionId, + "Updating lease for pump since it" + "is open -> %s." + ), partitionId, isOpen ); capturedPump.lease = lease; } } else { - log.pumpManager(withHostAndPartition(partitionId, "Creating a new pump with lease %o."), lease.getInfo()); - const pump = new PartitionPump(this._context, lease, this._context.onMessage!, this._context.onError!); + log.pumpManager( + withHostAndPartition(partitionId, "Creating a new pump with lease %o."), + lease.getInfo() + ); + const pump = new PartitionPump( + this._context, + lease, + this._context.onMessage!, + this._context.onError! + ); await pump.start(); } } catch (err) { diff --git a/sdk/eventhub/event-processor-host/src/util/utils.ts b/sdk/eventhub/event-processor-host/src/util/utils.ts index fc0f884c039a..c00996ee0f12 100644 --- a/sdk/eventhub/event-processor-host/src/util/utils.ts +++ b/sdk/eventhub/event-processor-host/src/util/utils.ts @@ -27,7 +27,9 @@ export function validateType( type: "string" | "number" | "boolean" | "Array" | "object" | "Date" | "function" ): void { if (required && paramValue == undefined) { - throw new TypeError(`${paramName} is required. Given value: ${paramValue}. Hence it cannot be null or undefined.`); + throw new TypeError( + `${paramName} is required. Given value: ${paramValue}. Hence it cannot be null or undefined.` + ); } if (paramValue != undefined) { if (type === "Array") { @@ -141,7 +143,12 @@ export async function retry(config: RetryConfig): Promise { config.partitionId ); } else { - log.util("[%s] Retry attempt: %d. Action '%s' suceeded.", config.hostName, retryCount, config.action); + log.util( + "[%s] Retry attempt: %d. Action '%s' suceeded.", + config.hostName, + retryCount, + config.action + ); } } catch (err) { innerError = err;