Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions sdk/eventhub/event-processor-host/samples/iothubEph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async function main(): Promise<void> {
await stopEph(eph);
}

main().catch(err => {
main().catch((err) => {
console.log("Exiting from main() due to an error: %O.", err);
});

Expand All @@ -58,7 +58,9 @@ async function startEph(ephName: string): Promise<EventProcessorHost> {
// Message handler
const partionCount: { [x: string]: number } = {};
const onMessage: OnReceivedMessage = async (context: PartitionContext, event: EventData) => {
!partionCount[context.partitionId] ? (partionCount[context.partitionId] = 1) : partionCount[context.partitionId]++;
!partionCount[context.partitionId]
? (partionCount[context.partitionId] = 1)
: partionCount[context.partitionId]++;
console.log(
"[%s] %d - Received message from partition: '%s', offset: '%s'",
ephName,
Expand All @@ -75,7 +77,11 @@ async function startEph(ephName: string): Promise<EventProcessorHost> {
eph.receivingFromPartitions
);
await context.checkpointFromEventData(event);
console.log("[%s] Successfully checkpointed message number %d", ephName, partionCount[context.partitionId]);
console.log(
"[%s] Successfully checkpointed message number %d",
ephName,
partionCount[context.partitionId]
);
} catch (err) {
console.log(
"[%s] An error occurred while checkpointing msg number %d: %O",
Expand Down
12 changes: 9 additions & 3 deletions sdk/eventhub/event-processor-host/samples/multiEph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async function main(): Promise<void> {
await stopEph(eph2);
}

main().catch(err => {
main().catch((err) => {
console.log("Error occurred: ", err);
});

Expand Down Expand Up @@ -75,7 +75,9 @@ async function startEph(ephName: string): Promise<EventProcessorHost> {
// Message handler
const partionCount: { [x: string]: number } = {};
const onMessage: OnReceivedMessage = async (context: PartitionContext, event: EventData) => {
!partionCount[context.partitionId] ? (partionCount[context.partitionId] = 1) : partionCount[context.partitionId]++;
!partionCount[context.partitionId]
? (partionCount[context.partitionId] = 1)
: partionCount[context.partitionId]++;
console.log(
"[%s] %d - Received message from partition: '%s', offset: '%s'",
ephName,
Expand All @@ -92,7 +94,11 @@ async function startEph(ephName: string): Promise<EventProcessorHost> {
eph.receivingFromPartitions
);
await context.checkpointFromEventData(event);
console.log("[%s] Successfully checkpointed message number %d", ephName, partionCount[context.partitionId]);
console.log(
"[%s] Successfully checkpointed message number %d",
ephName,
partionCount[context.partitionId]
);
} catch (err) {
console.log(
"[%s] An error occurred while checkpointing msg number %d: %O",
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/event-processor-host/samples/sendBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ async function main(): Promise<void> {
console.log("Sending batch events...");

// Will concurrently send batched events to all the partitions.
const sendPromises = partitionIds.map(id => client.sendBatch(events, id));
const sendPromises = partitionIds.map((id) => client.sendBatch(events, id));

await Promise.all(sendPromises);
await client.close();
}

main().catch(err => {
main().catch((err) => {
console.log("Error occurred: ", err);
});
12 changes: 9 additions & 3 deletions sdk/eventhub/event-processor-host/samples/singleEph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async function main(): Promise<void> {
await stopEph(eph);
}

main().catch(err => {
main().catch((err) => {
console.log("Error occurred: ", err);
});

Expand All @@ -68,7 +68,9 @@ async function startEph(ephName: string): Promise<EventProcessorHost> {
// Message handler
const partionCount: { [x: string]: number } = {};
const onMessage: OnReceivedMessage = async (context: PartitionContext, event: EventData) => {
!partionCount[context.partitionId] ? (partionCount[context.partitionId] = 1) : partionCount[context.partitionId]++;
!partionCount[context.partitionId]
? (partionCount[context.partitionId] = 1)
: partionCount[context.partitionId]++;
console.log(
"[%s] %d - Received message from partition: '%s', offset: '%s'",
ephName,
Expand All @@ -85,7 +87,11 @@ async function startEph(ephName: string): Promise<EventProcessorHost> {
eph.receivingFromPartitions
);
await context.checkpointFromEventData(event);
console.log("[%s] Successfully checkpointed message number %d", ephName, partionCount[context.partitionId]);
console.log(
"[%s] Successfully checkpointed message number %d",
ephName,
partionCount[context.partitionId]
);
} catch (err) {
console.log(
"[%s] An error occurred while checkpointing msg number %d: %O",
Expand Down
9 changes: 2 additions & 7 deletions sdk/eventhub/event-processor-host/samples/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@
"compilerOptions": {
"module": "commonjs"
},
"include": [
"**/*.ts"
],
"exclude": [
"../node_modules",
"../typings/**",
]
"include": ["**/*.ts"],
"exclude": ["../node_modules", "../typings/**"]
}
23 changes: 19 additions & 4 deletions sdk/eventhub/event-processor-host/src/azureBlob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,16 @@ export class AzureBlob {
return this._blobService.getContent(this._containerName, this._blobPath, options);
}

changeLease(currentLeaseId: string, proposedLeaseId: string): Promise<StorageBlobService.LeaseResult> {
return this._blobService.changeLease(this._containerName, this._blobPath, currentLeaseId, proposedLeaseId);
changeLease(
currentLeaseId: string,
proposedLeaseId: string
): Promise<StorageBlobService.LeaseResult> {
return this._blobService.changeLease(
this._containerName,
this._blobPath,
currentLeaseId,
proposedLeaseId
);
}

getBlobProperties(): Promise<StorageBlobService.BlobResult> {
Expand All @@ -97,7 +105,12 @@ export class AzureBlob {
metadata: Dictionary<string>,
options?: StorageBlobService.BlobRequestOptions
): Promise<StorageBlobService.BlobResult> {
return this._blobService.setBlobMetadata(this._containerName, this._blobPath, metadata, options);
return this._blobService.setBlobMetadata(
this._containerName,
this._blobPath,
metadata,
options
);
}

listBlobsSegmented(
Expand All @@ -106,7 +119,9 @@ export class AzureBlob {
return this._blobService.listBlobsSegmented(this._containerName, options);
}

acquireLease(options: StorageBlobService.AcquireLeaseRequestOptions): Promise<StorageBlobService.LeaseResult> {
acquireLease(
options: StorageBlobService.AcquireLeaseRequestOptions
): Promise<StorageBlobService.LeaseResult> {
return this._blobService.acquireLease(this._containerName, this._blobPath, options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le
await Promise.all(deleteBlobs);
await blobService.deleteContainerIfExists(storageContainerName);
} else {
throw new Error("'blobService' is not defined in the 'hostContext', hence cannot " + "list all the blobs.");
throw new Error(
"'blobService' is not defined in the 'hostContext', hence cannot " + "list all the blobs."
);
}
} catch (err) {
const msg =
Expand Down Expand Up @@ -189,7 +191,9 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le
const leaseBlobs = await this._listBlobs();
if (leaseBlobs.length === partitionIds.length) {
log.checkpointLeaseMgr(
withHost("Number of blobs %d === Number of partitionIds %d. " + "Hence no need to create leases."),
withHost(
"Number of blobs %d === Number of partitionIds %d. " + "Hence no need to create leases."
),
leaseBlobs.length,
partitionIds.length
);
Expand Down Expand Up @@ -235,7 +239,10 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le
returnLease = <AzureBlobLease>await this.getLease(partitionId);
} else {
log.error(
withHostAndPartition(partitionId, "An error occurred while creating lease if " + "it does not exist: %O."),
withHostAndPartition(
partitionId,
"An error occurred while creating lease if " + "it does not exist: %O."
),
error
);
throw error;
Expand Down Expand Up @@ -277,7 +284,10 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le
return false;
}
log.checkpointLeaseMgr(
withHostAndPartition(lease, "Need to change lease '%s' -> '%s' " + "for partitionId '%s'."),
withHostAndPartition(
lease,
"Need to change lease '%s' -> '%s' " + "for partitionId '%s'."
),
lease.token,
newLeaseId,
lease.partitionId
Expand Down Expand Up @@ -364,7 +374,8 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le
log.checkpointLeaseMgr(
withHostAndPartition(
lease,
"Let us renew the lease to make sure the " + "update with offset '%s' and sequence number %d will go through."
"Let us renew the lease to make sure the " +
"update with offset '%s' and sequence number %d will go through."
),
lease.offset,
lease.sequenceNumber
Expand Down Expand Up @@ -463,7 +474,9 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le
log.checkpointLeaseMgr(withHost("Number of blobs: %d"), listResult.entries.length);
return listResult.entries;
} else {
throw new Error("'blobService' is not defined in the 'hostContext', hence cannot " + "list all the blobs.");
throw new Error(
"'blobService' is not defined in the 'hostContext', hence cannot " + "list all the blobs."
);
}
}

Expand Down Expand Up @@ -521,7 +534,10 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le
options.metadata[metadataOwnerName] = lease.owner || this._context.hostName;
}
log.checkpointLeaseMgr(
withHostAndPartition(lease, "Trying to upload raw JSON for activity " + "'%s': %s, with options: %o"),
withHostAndPartition(
lease,
"Trying to upload raw JSON for activity " + "'%s': %s, with options: %o"
),
activity,
jsonToUpload,
options
Expand All @@ -546,7 +562,11 @@ export class AzureStorageCheckpointLeaseManager implements CheckpointManager, Le
result = true;
}
}
log.error(withHostAndPartition(partitionId, "Was lease lost -> %s, err: %O."), result, getStorageError(err));
log.error(
withHostAndPartition(partitionId, "Was lease lost -> %s, err: %O."),
result,
getStorageError(err)
);
return result;
}
}
Loading