Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions packages/core/src/utils/authentication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,24 @@ export const validateAuthorizationHeader = async <T extends object>(
destination,
...(content && { content }),
});

const signature = Uint8Array.from(atob(hash as string), (c) =>
c.charCodeAt(0),
);
const signingKeyBytes = Uint8Array.from(atob(signingKey as string), (c) =>
c.charCodeAt(0),
);
const messageBytes = new TextEncoder().encode(canonicalJson);
const isValid = nacl.sign.detached.verify(
messageBytes,
signature,
signingKeyBytes,
);

if (
!nacl.sign.detached.verify(
new TextEncoder().encode(canonicalJson),
signature,
Uint8Array.from(atob(signingKey as string), (c) => c.charCodeAt(0)),
)
) {
throw new Error(`Invalid signature for ${destination}`);
if (!isValid) {
throw new Error(
`Invalid signature from ${origin} for request to ${destination}`,
);
}

return true;
Expand Down
174 changes: 147 additions & 27 deletions packages/core/src/utils/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,62 +1,182 @@
import { type OutgoingHttpHeaders } from 'node:http';
import { type IncomingHttpHeaders } from 'node:http';
import https from 'node:https';

type RequestOptions = Parameters<typeof https.request>[1];

export type MultipartResult = {
content: Buffer;
headers?: Record<string, string>;
redirect?: string;
};

/**
* parses Matrix federation multipart/mixed media responses according to spec.
* https://spec.matrix.org/v1.15/server-server-api/#get_matrixfederationv1mediadownloadmediaid
*/
function parseMultipart(buffer: Buffer, boundary: string): MultipartResult {
const bufferStr = buffer.toString();

// check if the second part contains a Location header (CDN redirect)
// pattern: after first boundary and JSON part, look for Location header
const parts = bufferStr.split(`--${boundary}`);
if (parts.length >= 3) {
const secondPart = parts[2];
const locationMatch = secondPart.match(/\r?\nLocation:\s*(.+)\r?\n/i);

if (locationMatch) {
return {
content: Buffer.from(''),
redirect: locationMatch[1].trim(),
};
}
}

// find where the last part's content starts (after the last \r\n\r\n)
const lastHeaderEnd = buffer.lastIndexOf('\r\n\r\n');
if (lastHeaderEnd === -1) return { content: buffer };

const binaryStart = lastHeaderEnd + 4;
const closingBoundary = buffer.lastIndexOf(`\r\n--${boundary}`);

const content =
closingBoundary > binaryStart
? buffer.subarray(binaryStart, closingBoundary)
: buffer.subarray(binaryStart);

return { content };
}
Comment on lines +16 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Multipart parsing via Buffer.toString risks corruption; use Buffer-safe boundary scanning

Converting the whole payload to string can mangle binary content and mis-detect boundaries. Parse by boundary using Buffer indices and only decode header sections.

Apply this diff to replace parseMultipart:

-function parseMultipart(buffer: Buffer, boundary: string): MultipartResult {
-  const bufferStr = buffer.toString();
-
-  // check if the second part contains a Location header (CDN redirect)
-  // pattern: after first boundary and JSON part, look for Location header
-  const parts = bufferStr.split(`--${boundary}`);
-  if (parts.length >= 3) {
-    const secondPart = parts[2];
-    const locationMatch = secondPart.match(/\r?\nLocation:\s*(.+)\r?\n/i);
-
-    if (locationMatch) {
-      return {
-        content: Buffer.from(''),
-        redirect: locationMatch[1].trim(),
-      };
-    }
-  }
-
-  // find where the last part's content starts (after the last \r\n\r\n)
-  const lastHeaderEnd = buffer.lastIndexOf('\r\n\r\n');
-  if (lastHeaderEnd === -1) return { content: buffer };
-
-  const binaryStart = lastHeaderEnd + 4;
-  const closingBoundary = buffer.lastIndexOf(`\r\n--${boundary}`);
-
-  const content =
-    closingBoundary > binaryStart
-      ? buffer.subarray(binaryStart, closingBoundary)
-      : buffer.subarray(binaryStart);
-
-  return { content };
-}
+function parseMultipart(buffer: Buffer, boundary: string): MultipartResult {
+  const delim = Buffer.from(`--${boundary}`);
+  const nextDelimPrefix = Buffer.from(`\r\n--${boundary}`);
+  const parts: { headers: Buffer; body: Buffer }[] = [];
+
+  let idx = buffer.indexOf(delim);
+  while (idx !== -1) {
+    let start = idx + delim.length;
+    if (buffer[start] === 13 && buffer[start + 1] === 10) start += 2; // \r\n
+    const headerEnd = buffer.indexOf('\r\n\r\n', start, 'ascii');
+    if (headerEnd === -1) break;
+    const headersBuf = buffer.subarray(start, headerEnd);
+    const nextIdx = buffer.indexOf(nextDelimPrefix, headerEnd + 4, 'ascii');
+    const bodyEnd = nextIdx === -1 ? buffer.length : nextIdx;
+    parts.push({ headers: headersBuf, body: buffer.subarray(headerEnd + 4, bodyEnd) });
+    idx = nextIdx === -1 ? -1 : nextIdx + 2; // skip \r\n
+  }
+
+  // CDN redirect convention: Location header in second part
+  if (parts[1]) {
+    const h = parts[1].headers.toString('ascii');
+    const m = h.match(/\bLocation:\s*(.+)\r?$/mi);
+    if (m) return { content: Buffer.alloc(0), redirect: m[1].trim() };
+  }
+
+  return parts.length ? { content: parts[parts.length - 1].body } : { content: buffer };
+}

I can also add tests covering binary bodies and Location redirects on request.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
function parseMultipart(buffer: Buffer, boundary: string): MultipartResult {
const bufferStr = buffer.toString();
// check if the second part contains a Location header (CDN redirect)
// pattern: after first boundary and JSON part, look for Location header
const parts = bufferStr.split(`--${boundary}`);
if (parts.length >= 3) {
const secondPart = parts[2];
const locationMatch = secondPart.match(/\r?\nLocation:\s*(.+)\r?\n/i);
if (locationMatch) {
return {
content: Buffer.from(''),
redirect: locationMatch[1].trim(),
};
}
}
// find where the last part's content starts (after the last \r\n\r\n)
const lastHeaderEnd = buffer.lastIndexOf('\r\n\r\n');
if (lastHeaderEnd === -1) return { content: buffer };
const binaryStart = lastHeaderEnd + 4;
const closingBoundary = buffer.lastIndexOf(`\r\n--${boundary}`);
const content =
closingBoundary > binaryStart
? buffer.subarray(binaryStart, closingBoundary)
: buffer.subarray(binaryStart);
return { content };
}
function parseMultipart(buffer: Buffer, boundary: string): MultipartResult {
const delim = Buffer.from(`--${boundary}`);
const nextDelimPrefix = Buffer.from(`\r\n--${boundary}`);
const parts: { headers: Buffer; body: Buffer }[] = [];
let idx = buffer.indexOf(delim);
while (idx !== -1) {
let start = idx + delim.length;
if (buffer[start] === 13 && buffer[start + 1] === 10) start += 2; // \r\n
const headerEnd = buffer.indexOf('\r\n\r\n', start, 'ascii');
if (headerEnd === -1) break;
const headersBuf = buffer.subarray(start, headerEnd);
const nextIdx = buffer.indexOf(nextDelimPrefix, headerEnd + 4, 'ascii');
const bodyEnd = nextIdx === -1 ? buffer.length : nextIdx;
parts.push({ headers: headersBuf, body: buffer.subarray(headerEnd + 4, bodyEnd) });
idx = nextIdx === -1 ? -1 : nextIdx + 2; // skip \r\n
}
// CDN redirect convention: Location header in second part
if (parts[1]) {
const h = parts[1].headers.toString('ascii');
const m = h.match(/\bLocation:\s*(.+)\r?$/mi);
if (m) return { content: Buffer.alloc(0), redirect: m[1].trim() };
}
return parts.length ? { content: parts[parts.length - 1].body } : { content: buffer };
}


function handleJson<T>(contentType: string, body: Buffer): Promise<T | null> {
if (!contentType.includes('application/json')) {
return Promise.resolve(null);
}

try {
return Promise.resolve(JSON.parse(body.toString()));
} catch {
return Promise.resolve(null);
}
}

function handleText(contentType: string, body: Buffer): Promise<string> {
if (!contentType.includes('text/')) {
return Promise.resolve('');
}

return Promise.resolve(body.toString());
}

function handleMultipart(
contentType: string,
body: Buffer,
): Promise<MultipartResult | null> {
if (!/\bmultipart\b/i.test(contentType)) {
return Promise.resolve(null);
}

// extract boundary from content-type header
const boundaryMatch = contentType.match(/boundary=([^;,\s]+)/i);
if (!boundaryMatch) {
return Promise.resolve(null);
}

// remove quotes if present
const boundary = boundaryMatch[1].replace(/^["']|["']$/g, '');
return Promise.resolve(parseMultipart(body, boundary));
}

// this fetch is used when connecting to a multihome server, same server hosting multiple homeservers, and we need to verify the cert with the right SNI (hostname), or else, cert check will fail due to connecting through ip and not hostname (due to matrix spec).
export async function fetch(url: URL, options: RequestInit) {
export async function fetch<T>(url: URL, options: RequestInit) {
const serverName = new URL(
`http://${(options.headers as OutgoingHttpHeaders).Host}` as string,
`http://${(options.headers as IncomingHttpHeaders).Host}` as string,
).hostname;

const requestParams: RequestOptions = {
host: url.hostname, // IP
port: url.port,
method: options.method,
path: url.pathname + url.search,
headers: {
...(options.headers as OutgoingHttpHeaders),
'content-type': 'application/json',
},
headers: options.headers as IncomingHttpHeaders,
servername: serverName,
};

try {
const response: { statusCode: number | undefined; body: string } =
await new Promise((resolve, reject) => {
const request = https.request(requestParams, (res) => {
let data = '';
res.on('data', (chunk) => {
data += chunk;
});
res.on('end', () => {
resolve({
statusCode: res.statusCode,
body: data,
});
});
const response: {
statusCode: number | undefined;
body: Buffer;
headers: IncomingHttpHeaders;
} = await new Promise((resolve, reject) => {
const request = https.request(requestParams, (res) => {
const chunks: Buffer[] = [];

res.once('error', reject);

// TODO: Make @hs/core fetch size limit configurable
let total = 0;
const MAX_RESPONSE_BYTES = 50 * 1024 * 1024; // 50 MB

res.on('data', (chunk) => {
total += chunk.length;
if (total > MAX_RESPONSE_BYTES) {
request.destroy(new Error('Response exceeds size limit'));
return;
}
chunks.push(chunk);
});
request.on('error', (err) => {
reject(err);

Comment on lines +118 to +126
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Stop the response stream when size limit is exceeded

Destroying only the request may not halt the inbound response; explicitly destroy the response to stop I/O and free memory earlier.

Apply this diff:

-        res.on('data', (chunk) => {
+        res.on('data', (chunk) => {
           total += chunk.length;
           if (total > MAX_RESPONSE_BYTES) {
-            request.destroy(new Error('Response exceeds size limit'));
+            const err = new Error('Response exceeds size limit');
+            res.destroy(err);
+            request.destroy(err);
             return;
           }
           chunks.push(chunk);
         });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
res.on('data', (chunk) => {
total += chunk.length;
if (total > MAX_RESPONSE_BYTES) {
request.destroy(new Error('Response exceeds size limit'));
return;
}
chunks.push(chunk);
});
request.on('error', (err) => {
reject(err);
res.on('data', (chunk) => {
total += chunk.length;
if (total > MAX_RESPONSE_BYTES) {
const err = new Error('Response exceeds size limit');
res.destroy(err);
request.destroy(err);
return;
}
chunks.push(chunk);
});
🤖 Prompt for AI Agents
In packages/core/src/utils/fetch.ts around lines 118 to 126, the code only
destroys the request when the response exceeds MAX_RESPONSE_BYTES which may not
stop the inbound response stream; update the data handler to also destroy the
response object (e.g., call res.destroy(new Error('Response exceeds size
limit'))) when the limit is exceeded (you can still call request.destroy(...)
for safety), so the response stream is terminated immediately and memory/IO is
freed.

res.on('end', () => {
resolve({
statusCode: res.statusCode,
body: Buffer.concat(chunks),
headers: res.headers,
});
});
});

const signal = options.signal;
if (signal) {
const onAbort = () => request.destroy(new Error('Aborted'));
signal.addEventListener('abort', onAbort, { once: true });
request.once('close', () =>
signal.removeEventListener('abort', onAbort),
);
}

request.end(options.body);
request.on('error', (err) => {
reject(err);
});

// TODO: Make @hs/core fetch timeout configurable
request.setTimeout(20_000, () => {
request.destroy(new Error('Request timed out after 20s'));
});

request.end(options.body);
});
Comment on lines +154 to +155
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard unsupported RequestInit.body types

RequestInit.body can be Headers API types (ReadableStream, URLSearchParams, etc.). https.request only accepts string | Buffer. Without guarding, this can throw at runtime.

Apply this diff:

-      request.end(options.body);
+      const payload =
+        typeof options.body === 'string' || Buffer.isBuffer(options.body)
+          ? (options.body as string | Buffer)
+          : undefined;
+      if (options.body && !payload) {
+        request.destroy(new Error('Unsupported request body type'));
+        return;
+      }
+      request.end(payload);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
request.end(options.body);
});
const payload =
typeof options.body === 'string' || Buffer.isBuffer(options.body)
? (options.body as string | Buffer)
: undefined;
if (options.body && !payload) {
request.destroy(new Error('Unsupported request body type'));
return;
}
request.end(payload);
});
🤖 Prompt for AI Agents
In packages/core/src/utils/fetch.ts around lines 154-155, the call
request.end(options.body) can throw because RequestInit.body may be
non-string/Buffer types (ReadableStream, URLSearchParams, FormData, Blob,
ArrayBuffer, etc.); update the code to guard and normalize body types before
calling request.end: if body is undefined/null call request.end() with no arg;
if body is a string or Buffer pass it through; if body is URLSearchParams call
body.toString(); if body is ArrayBuffer or Observable/TypedArray/Blob convert to
Buffer; if body is a ReadableStream or Node stream pipe it into the request
instead of using request.end; for unsupported types reject the promise with a
clear error. Ensure headers like content-length are adjusted accordingly when
you convert the body.


const contentType = response.headers['content-type'] || '';

return {
ok: response.statusCode
? response.statusCode >= 200 && response.statusCode < 300
: false,
json: () => JSON.parse(response.body),
text: () => response.body,
buffer: () => response.body,
json: () => handleJson<T>(contentType, response.body),
text: () => handleText(contentType, response.body),
multipart: () => handleMultipart(contentType, response.body),
status: response.statusCode,
headers: response.headers,
};
} catch (err) {
return {
ok: false,
json: () => undefined,
text: () => (err instanceof Error ? err.message : String(err)),
status: undefined,
headers: {},
buffer: () => Buffer.from(''),
json: () => Promise.resolve(null),
text: () =>
Promise.resolve(err instanceof Error ? err.message : String(err)),
multipart: () => Promise.resolve(null),
};
}
}
23 changes: 21 additions & 2 deletions packages/federation-sdk/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@ import { EventStagingRepository } from './repositories/event-staging.repository'
import { EventRepository } from './repositories/event.repository';
import { Key, KeyRepository } from './repositories/key.repository';
import { Lock, LockRepository } from './repositories/lock.repository';
import {
MatrixBridgedRoom,
MatrixBridgedRoomRepository,
} from './repositories/matrix-bridged-room.repository';
import { Room, RoomRepository } from './repositories/room.repository';
import { Server, ServerRepository } from './repositories/server.repository';
import { StateRepository, StateStore } from './repositories/state.repository';
import { Upload, UploadRepository } from './repositories/upload.repository';
import { ConfigService } from './services/config.service';
import { DatabaseConnectionService } from './services/database-connection.service';
import { EduService } from './services/edu.service';
Expand Down Expand Up @@ -86,20 +91,34 @@ export async function createFederationContainer(
useValue: db.collection<Server>('rocketchat_federation_servers'),
});

container.register<Collection<Upload>>('UploadCollection', {
useValue: db.collection<Upload>('rocketchat_uploads'),
});

container.register<Collection<MatrixBridgedRoom>>(
'MatrixBridgedRoomCollection',
{
useValue: db.collection<MatrixBridgedRoom>(
'rocketchat_matrix_bridged_rooms',
),
},
);

container.registerSingleton(EventRepository);
container.registerSingleton(EventStagingRepository);
container.registerSingleton(KeyRepository);
container.registerSingleton(LockRepository);
container.registerSingleton(RoomRepository);
container.registerSingleton(StateRepository);
container.registerSingleton(ServerRepository);
container.registerSingleton(MatrixBridgedRoomRepository);
container.registerSingleton(UploadRepository);

container.registerSingleton(FederationRequestService);
container.registerSingleton(FederationService);
container.registerSingleton(StateService);
container.registerSingleton(EventAuthorizationService);
container.registerSingleton('EventFetcherService', EventFetcherService);
container.registerSingleton(EventService);
container.registerSingleton(EventFetcherService);
container.registerSingleton(EventAuthorizationService);
container.registerSingleton(EventEmitterService);
container.registerSingleton(InviteService);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { Collection } from 'mongodb';
import { inject, singleton } from 'tsyringe';

export type MatrixBridgedRoom = {
rid: string; // Rocket.Chat room ID
mri: string; // Matrix room ID
fromServer: string;
};

@singleton()
export class MatrixBridgedRoomRepository {
constructor(
@inject('MatrixBridgedRoomCollection')
private readonly collection: Collection<MatrixBridgedRoom>,
) {}

async findMatrixRoomId(rocketChatRoomId: string): Promise<string | null> {
const bridgedRoom = await this.collection.findOne({
rid: rocketChatRoomId,
});

return bridgedRoom?.mri || null;
}
}
26 changes: 26 additions & 0 deletions packages/federation-sdk/src/repositories/upload.repository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Collection } from 'mongodb';
import { inject, singleton } from 'tsyringe';

export type Upload = {
rid: string;
federation: {
mxcUri: string;
serverName: string;
mediaId: string;
};
};

@singleton()
export class UploadRepository {
constructor(
@inject('UploadCollection') private readonly collection: Collection<Upload>,
) {}

async findRocketChatRoomIdByMediaId(mediaId: string): Promise<string | null> {
const upload = await this.collection.findOne({
'federation.mediaId': mediaId,
});

return upload?.rid || null;
}
}
Loading