Skip to content

Commit

Permalink
test(NODE-5197): move started event
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Nov 6, 2023
1 parent dd81180 commit b8b5173
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 20 deletions.
26 changes: 7 additions & 19 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500,
serverMonitoringMode: options.serverMonitoringMode
});
console.log(getFAASEnv());
this.isRunningInFaasEnv = getFAASEnv() != null;

const cancellationToken = this[kCancellationToken];
Expand Down Expand Up @@ -242,25 +241,19 @@ function useStreamingProtocol(monitor: Monitor, topologyVersion: TopologyVersion
function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
let start = now();
const topologyVersion = monitor[kServer].description.topologyVersion;
console.log('checkServer', topologyVersion);
const isAwaitable = useStreamingProtocol(monitor, topologyVersion);
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, isAwaitable)
);

function failureHandler(err: Error) {
function failureHandler(err: Error, awaited: boolean) {
monitor[kConnection]?.destroy({ force: true });
monitor[kConnection] = undefined;

monitor.emit(
Server.SERVER_HEARTBEAT_FAILED,
new ServerHeartbeatFailedEvent(
monitor.address,
calculateDurationInMs(start),
err,
isAwaitable
)
new ServerHeartbeatFailedEvent(monitor.address, calculateDurationInMs(start), err, awaited)
);

const error = !(err instanceof MongoError)
Expand Down Expand Up @@ -307,7 +300,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {

connection.command(ns('admin.$cmd'), cmd, options, (err, hello) => {
if (err) {
return failureHandler(err);
return failureHandler(err, isAwaitable);
}

if (!('isWritablePrimary' in hello)) {
Expand All @@ -319,16 +312,12 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
const duration =
isAwaitable && rttPinger ? rttPinger.roundTripTime : calculateDurationInMs(start);

console.log('command', topologyVersion, hello.topologyVersion, hello);
const awaited = useStreamingProtocol(monitor, hello.topologyVersion);
monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, awaited)
new ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable)
);

// if we are using the streaming protocol then we immediately issue another `started`
// event, otherwise the "check" is complete and return to the main monitor loop
if (awaited) {
if (isAwaitable) {
monitor.emit(
Server.SERVER_HEARTBEAT_STARTED,
new ServerHeartbeatStartedEvent(monitor.address, true)
Expand All @@ -350,7 +339,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
if (err) {
monitor[kConnection] = undefined;

failureHandler(err);
failureHandler(err, false);
return;
}

Expand All @@ -371,7 +360,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
monitor.address,
calculateDurationInMs(start),
conn.hello,
false
useStreamingProtocol(monitor, conn.hello?.topologyVersion)
)
);

Expand Down Expand Up @@ -404,7 +393,6 @@ function monitorServer(monitor: Monitor) {
}

// if the check indicates streaming is supported, immediately reschedule monitoring
console.log('checkServerCallback', hello?.topologyVersion);
if (useStreamingProtocol(monitor, hello?.topologyVersion)) {
setTimeout(() => {
if (!isInCloseState(monitor)) {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1823,7 +1823,7 @@ describe('Change Streams', function () {
});
});

describe.only('ChangeStream resumability', function () {
describe('ChangeStream resumability', function () {
let client: MongoClient;
let collection: Collection;
let changeStream: ChangeStream;
Expand Down

0 comments on commit b8b5173

Please sign in to comment.