Skip to content

Commit

Permalink
Add rate limit jitter, more tests, refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Oct 6, 2023
1 parent 9216abb commit 45f0b18
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 47 deletions.
26 changes: 18 additions & 8 deletions .gcloudignore
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
# This file specifies files that are *not* uploaded to Google Cloud
# This file specifies files that are _not_ uploaded to Google Cloud

# using gcloud. It follows the same syntax as .gitignore, with the addition of

# "#!include" directives (which insert the entries of the given .gitignore-style

# file at that point).

#

# For more information, run:
# $ gcloud topic gcloudignore

# $ gcloud topic gcloudignore

#

.DS\*Store
.gcloudignore
# If you would like to upload your .git directory, .gitignore file or files
# from your .gitignore file, remove the corresponding line
# below:
.git
.git/
.gitignore

node_modules
LICENSE
README.md
coverage/
example_openai.ts
node_modules
pnpm-lock.yaml
test/
79 changes: 56 additions & 23 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,48 +1,81 @@
'use strict';

import functions from '@google-cloud/functions-framework';
import OpenAI from 'openai';
import { validateEvent, verifySignature } from 'nostr-tools';
// We use lib instead of explicit functions so that we can stub them in our tests
import lib from './src/lib.js';

import functions from '@google-cloud/functions-framework';
let startTime;
const FUNCTION_TIMEOUT_MS = 60000;
const RATE_LIMIT_ERROR_CODE = 429;

// Assumes OPENAI_API_KEY has been set in the environment
// Keep this initialization in the global module scope so it can be reused
// across function invocations
const openai = new OpenAI();

functions.cloudEvent('nostrEventsPubSub', async (cloudEvent) => {
try {
const data = cloudEvent.data.message.data;
startTime = Date.now();

const eventJSON = data ? Buffer.from(data, 'base64').toString() : '{}';
const event = JSON.parse(eventJSON);

if (!validateEvent(event)) {
console.error('Invalid Nostr Event');
return;
}
try {
const event = getVerifiedEvent(cloudEvent.data.message.data);

if (!verifySignature(event)) {
console.error('Invalid Nostr Event Signature');
if (!event) {
return;
}

const response = await openai.moderations.create({ input: event.content });
const moderation = response.results[0];
const moderation = await getModeration(event);

if (!moderation.flagged) {
if (!moderation) {
console.log(`Nostr Event ${event.id} Passed Moderation. Skipping`);
return;
}

const moderationEvent = await lib.publishModerationResult(
event,
moderation
);
} catch (err) {
// For the moment log every error to the console and let the function finish
// Well handle retries
console.error(err);
await lib.publishModeration(event, moderation);
} catch (error) {
if (error?.response?.status === RATE_LIMIT_ERROR_CODE) {
console.error('Rate limit error. Adding random pause');
await randomPause();
throw error;
}
}
});

function getVerifiedEvent(data) {
const eventJSON = data ? Buffer.from(data, 'base64').toString() : '{}';
const event = JSON.parse(eventJSON);

if (!validateEvent(event)) {
console.error('Invalid Nostr Event');
return;
}

if (!verifySignature(event)) {
console.error('Invalid Nostr Event Signature');
return;
}

return event;
}

async function getModeration(event) {
const response = await openai.moderations.create({ input: event.content });
const moderation = response.results[0];

if (moderation.flagged) {
return moderation;
}
}

// Random pause within the window of half of the remaining available time before
// hitting timeout.
// https://platform.openai.com/docs/guides/rate-limits/error-mitigation
async function randomPause() {
const elapsedMs = Date.now() - startTime;
const remainingMs = FUNCTION_TIMEOUT_MS - elapsedMs;
const halfOfRemainingTime = remainingMs / 2;
const jitterTimeoutMs = Math.random() * halfOfRemainingTime;

await lib.waitMillis(jitterTimeoutMs);
}
16 changes: 11 additions & 5 deletions src/lib.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import NDK, { NDKEvent, NDKPrivateKeySigner } from '@nostr-dev-kit/ndk';
import { WebSocket } from 'ws';

if (!process.env.NOSTR_PRIVATE_KEY) {
throw new Error('NOSTR_PRIVATE_KEY environment variable is required');
}

// Hack to be able to have a global WebSocket object in Google Cloud Functions
global.WebSocket = WebSocket;

Expand All @@ -15,9 +19,6 @@ const RELAYS = [
'wss://rss.nos.social',
];

if (!process.env.NOSTR_PRIVATE_KEY) {
throw new Error('NOSTR_PRIVATE_KEY environment variable is required');
}
const signer = new NDKPrivateKeySigner(process.env.NOSTR_PRIVATE_KEY);
const userPromise = signer.user();

Expand Down Expand Up @@ -55,7 +56,7 @@ const MODERATION_CATEGORIES = {

// Creates a NIP-32 event flagging a Nostr event.
// See: https://github.com/nostr-protocol/nips/blob/master/32.md
async function publishModerationResult(moderatedNostrEvent, moderation) {
async function publishModeration(moderatedNostrEvent, moderation) {
// Ensure we are already connected and it was done once in the module scope
// during cold start
await connectedPromise;
Expand Down Expand Up @@ -111,7 +112,12 @@ function getContent(moderation) {
.trim();
}

async function waitMillis(millis) {
await new Promise((resolve) => setTimeout(resolve, millis));
}

// This trick is needed so that we can stub the function in our tests
export default {
publishModerationResult,
publishModeration,
waitMillis,
};
60 changes: 49 additions & 11 deletions test/unit.test.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { getFunction } from '@google-cloud/functions-framework/testing';
import '../index.js';

import assert from 'assert';
import sinon from 'sinon';
import lib from '../src/lib.js';
import { NDKEvent } from '@nostr-dev-kit/ndk';
import OpenAI from 'openai';
import '../index.js';

const nostrEvent = {
id: '4376c65d2f232afbe9b882a35baa4f6fe8667c4e684749af565f981833ed6a65',
Expand Down Expand Up @@ -32,19 +33,17 @@ const flaggedNostrEvent = {

describe('Function', () => {
beforeEach(function () {
sinon.stub(console, 'error');
sinon.stub(console, 'log');
sinon.spy(console, 'error');
sinon.spy(console, 'log');
sinon.stub(NDKEvent.prototype, 'publish').returns(Promise.resolve());
});

afterEach(function () {
NDKEvent.prototype.publish.restore();
console.log.restore();
console.error.restore();
sinon.restore();
});

it('should do nothing for a valid event that is not flagged', async () => {
sinon.stub(lib, 'publishModerationResult');
sinon.stub(lib, 'publishModeration');
const cloudEvent = { data: { message: {} } };
cloudEvent.data.message = {
data: Buffer.from(JSON.stringify(nostrEvent)).toString('base64'),
Expand All @@ -53,20 +52,22 @@ describe('Function', () => {
const nostrEventsPubSub = getFunction('nostrEventsPubSub');
await nostrEventsPubSub(cloudEvent);

assert.ok(lib.publishModerationResult.notCalled);
lib.publishModerationResult.restore();
assert.ok(lib.publishModeration.notCalled);
lib.publishModeration.restore();
});

it('should publish a moderation event for a valid event that is flagged', async () => {
const cloudEvent = { data: { message: {} } };
cloudEvent.data.message = {
data: Buffer.from(JSON.stringify(flaggedNostrEvent)).toString('base64'),
};

const waitMillisStub = sinon.stub(lib, 'waitMillis');
const nostrEventsPubSub = getFunction('nostrEventsPubSub');

await nostrEventsPubSub(cloudEvent);

assert.ok(NDKEvent.prototype.publish.called);
sinon.assert.notCalled(waitMillisStub);
});

it('should detect and invalid event', async () => {
Expand Down Expand Up @@ -99,9 +100,46 @@ describe('Function', () => {
assert.ok(console.error.calledWith('Invalid Nostr Event Signature'));
assert.ok(NDKEvent.prototype.publish.notCalled);
});

it('should add jitter pause after a rate limit error', async () => {
const cloudEvent = { data: { message: {} } };
const nEvent = { ...nostrEvent };
cloudEvent.data.message = {
data: Buffer.from(JSON.stringify(nEvent)).toString('base64'),
};

const nostrEventsPubSub = getFunction('nostrEventsPubSub');
const createModerationStub = sinon
.stub(OpenAI.Moderations.prototype, 'create')
.rejects({
response: {
status: 429,
},
});
const waitMillisStub = sinon.stub(lib, 'waitMillis');

// Ensure the rate limit error is still thrown after being handled so that
// we still trigger the pubsub topic subscription retry policy
await assert.rejects(nostrEventsPubSub(cloudEvent), (error) => {
return (
(error.response && error.response.status === 429) ||
new assert.AssertionError({ message: 'Expected a 429 status code' })
);
});

assert.ok(NDKEvent.prototype.publish.notCalled);

sinon.assert.calledWithMatch(
waitMillisStub,
sinon.match(
(number) => typeof number === 'number' && number > 0,
'positive number'
)
);
});
});

// Helper function to log all call arguments
// Helper function to log all stub call arguments during debugging
function logAllCallArguments(stub) {
for (let i = 0; i < stub.callCount; i++) {
const callArgs = stub.getCall(i).args;
Expand Down

0 comments on commit 45f0b18

Please sign in to comment.