Skip to content

Commit

Permalink
use HEAD request to check upstream servers
Browse files Browse the repository at this point in the history
  • Loading branch information
hzrd149 committed Mar 20, 2024
1 parent 256e2ee commit 89e1c4d
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 53 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"name": "blossom-server",
"private": true,
"version": "0.1.0",
"description": "Generic blob storage and retrieval for nostr",
"main": "index.js",
Expand Down
84 changes: 40 additions & 44 deletions src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { NostrEvent } from "@nostr-dev-kit/ndk";
import Router from "@koa/router";

import { config } from "./config.js";
import { BlobSearch } from "./types.js";
import { BlobPointer, BlobSearch } from "./types.js";
import * as cacheModule from "./cache/index.js";
import * as cdnDiscovery from "./discover/upstream.js";
import * as nostrDiscovery from "./discover/nostr.js";
Expand Down Expand Up @@ -227,7 +227,7 @@ router.get("/:hash", async (ctx, next) => {
const minExpiration = getExpirationTime(rule);
if (!expiration || expiration < minExpiration) {
setBlobExpiration(cachePointer.hash, minExpiration);
log("Reset expiration for", cachePointer.hash, "from", expiration, "to", minExpiration);
log("Reset expiration for", cachePointer.hash, "to", rule.expiration);
}
}
}
Expand All @@ -240,53 +240,49 @@ router.get("/:hash", async (ctx, next) => {
return;
}

const pointers: BlobPointer[] = [];

if (config.discovery.nostr.enabled) {
let pointers = await nostrDiscovery.search(search);
if (pointers.length) {
for (const pointer of pointers) {
try {
if (pointer.type === "http") {
const stream = await httpTransport.readHTTPPointer(pointer);
if (pointer.mimeType) ctx.type = pointer.mimeType;
const pass = (ctx.body = new PassThrough());
stream.pipe(pass);

// save to cache
const rule = getFileRule(
{ mimeType: pointer.mimeType, pubkey: pointer.metadata?.pubkey },
config.cache.rules,
);
if (rule) {
uploadModule.uploadWriteStream(stream).then((upload) => {
if (upload.hash !== pointer.hash) return;
setBlobExpiration(upload.hash, getExpirationTime(rule));
cacheModule.saveBlob(upload.hash, upload.tempFile, pointer.metadata?.mimeType || upload.mimeType);
});
}
return;
}
} catch (e) {}
}
}
let nostrPointers = await nostrDiscovery.search(search);
for (const pointer of nostrPointers) pointers.push(pointer);
}

if (config.discovery.upstream.enabled) {
const cdnSource = await cdnDiscovery.search(search);
if (cdnSource) {
if (search.ext) ctx.type = search.ext;
const pass = (ctx.body = new PassThrough());
cdnSource.pipe(pass);

// save to cache
const rule = getFileRule({ mimeType: search.mimeType, pubkey: search.pubkey }, config.cache.rules);
if (rule) {
uploadModule.uploadWriteStream(cdnSource).then((upload) => {
if (upload.hash !== search.hash) return;
setBlobExpiration(upload.hash, getExpirationTime(rule));
cacheModule.saveBlob(upload.hash, upload.tempFile, search.mimeType || upload.mimeType);
});
const cdnPointer = await cdnDiscovery.search(search);
if (cdnPointer) pointers.push(cdnPointer);
}

for (const pointer of pointers) {
try {
if (pointer.type === "http") {
const stream = await httpTransport.readHTTPPointer(pointer);

// set mime type
if (!ctx.type && pointer.mimeType) ctx.type = pointer.mimeType;
if (!ctx.type && search.mimeType) ctx.type = search.mimeType;

const pass = (ctx.body = new PassThrough());
stream.pipe(pass);

// save to cache
const rule = getFileRule(
{ mimeType: pointer.mimeType || search.mimeType, pubkey: pointer.metadata?.pubkey || search.pubkey },
config.cache.rules,
);
if (rule) {
uploadModule.uploadWriteStream(stream).then((upload) => {
if (upload.hash !== pointer.hash) return;
setBlobExpiration(upload.hash, getExpirationTime(rule));
cacheModule.saveBlob(
upload.hash,
upload.tempFile,
pointer.mimeType || pointer.metadata?.mimeType || upload.mimeType || search.mimeType,
);
});
}
return;
}
}
} catch (e) {}
}

if (!ctx.body) throw new httpError.NotFound("Cant find blob for hash");
Expand Down
22 changes: 13 additions & 9 deletions src/discover/upstream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import debug from "debug";
import type { IncomingMessage } from "http";
import followRedirects from "follow-redirects";
import { BlobSearch } from "../types.js";
import { BlobSearch, HTTPPointer } from "../types.js";
import { config } from "../config.js";
const { http, https } = followRedirects;

Expand All @@ -17,21 +16,26 @@ export async function search(search: BlobSearch) {
}
}

async function checkCDN(cdn: string, search: BlobSearch): Promise<IncomingMessage> {
const { hash, ext } = search;
return new Promise((resolve, reject) => {
const url = new URL(hash + (ext || ""), cdn);
async function checkCDN(cdn: string, search: BlobSearch): Promise<HTTPPointer> {
const { hash, ext, pubkey } = search;
return new Promise<HTTPPointer>((resolve, reject) => {
const url = new URL(hash, cdn);
const backend = url.protocol === "https:" ? https : http;

backend.get(url.toString(), (res) => {
const request = backend.request(url.toString(), { method: "HEAD", timeout: 5 * 1000 }, (res) => {
if (!res.statusCode) return reject();
if (res.statusCode < 200 || res.statusCode >= 400) {
res.destroy();
reject(new Error(res.statusMessage));
reject(new Error("Not Found"));
} else {
resolve(res);
log("Found", hash + ext || "", "at", cdn);
resolve({ type: "http", url: url.toString(), hash: search.hash, metadata: { pubkey } });
}
});

request.on("timeout", () => {
request.destroy();
reject(new Error("Timeout"));
});
});
}

0 comments on commit 89e1c4d

Please sign in to comment.