Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 101 additions & 7 deletions apps/worker/src/jobs/link-unfurl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import { channelRoom } from "@repo/realtime-types/rooms"
import type { Emitter } from "@socket.io/redis-emitter"
import type { Job } from "bullmq"
import ogs from "open-graph-scraper"
import { logger } from "@/lib/logger"

const OG_FETCH_TIMEOUT_MS = 5000
const MAX_REDIRECTS = 5

const PRIVATE_IP_REGEX =
/^(127\.|10\.|172\.(1[6-9]|2\d|3[01])\.|192\.168\.|0\.|169\.254\.|::1|fc|fd|fe80)/i
Expand All @@ -32,11 +34,18 @@ async function isSafeUrl(urlString: string): Promise<boolean> {

const addresses = await lookup(hostname, { all: true })
for (const { address } of addresses) {
if (PRIVATE_IP_REGEX.test(address)) return false
if (PRIVATE_IP_REGEX.test(address)) {
logger.warn(
{ hostname, address },
"Blocked private IP after DNS lookup"
)
return false
}
}

return true
} catch {
} catch (err) {
logger.warn({ err, url: urlString }, "URL safety check failed")
Comment thread
BuckyMcYolo marked this conversation as resolved.
return false
}
}
Expand Down Expand Up @@ -68,15 +77,72 @@ function matchProxyRule(originalUrl: string) {
return null
}

/** Follow redirects manually, validating each hop through isSafeUrl. */
async function resolveRedirects(startUrl: string): Promise<string | null> {
let current = startUrl
for (let i = 0; i < MAX_REDIRECTS; i++) {
let res: Response
try {
res = await fetch(current, {
method: "HEAD",
headers: { "user-agent": "Townhall/1.0 OGBot" },
redirect: "manual",
signal: AbortSignal.timeout(OG_FETCH_TIMEOUT_MS),
})
} catch (err) {
logger.warn(
{ err, startUrl, current },
"Redirect resolution fetch failed"
)
return null
}

if (res.status >= 300 && res.status < 400) {
const location = res.headers.get("location")
if (!location) return null
// Resolve relative Location headers against the current URL
let next: string
try {
next = new URL(location, current).toString()
} catch {
logger.warn({ location, current }, "Malformed redirect Location header")
return null
}
if (!(await isSafeUrl(next))) {
logger.warn(
{ from: current, to: next },
"Redirect target failed safety check"
)
return null
}
current = next
continue
}

return current
}
logger.warn({ url: startUrl }, "Too many redirects")
return null
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

async function fetchOgEmbed(url: string): Promise<Embed | null> {
const proxy = matchProxyRule(url)
const fetchUrl = proxy?.fetchUrl ?? url

if (!(await isSafeUrl(fetchUrl))) return null
if (!(await isSafeUrl(fetchUrl))) {
logger.info({ url, fetchUrl }, "Skipped unsafe URL")
return null
}

const resolvedUrl = await resolveRedirects(fetchUrl)
if (!resolvedUrl) {
logger.info({ url, fetchUrl }, "Redirect resolution failed")
return null
}

try {
const { error, result } = await ogs({
url: fetchUrl,
url: resolvedUrl,
timeout: OG_FETCH_TIMEOUT_MS,
fetchOptions: {
headers: {
Expand All @@ -87,12 +153,18 @@ async function fetchOgEmbed(url: string): Promise<Embed | null> {
})

if (error || !result.success) {
logger.warn({ url, resolvedUrl, error }, "OG scrape returned no result")
return null
}

const imageUrl =
result.ogImage?.[0]?.url ?? result.twitterImage?.[0]?.url ?? undefined

logger.info(
{ url, title: result.ogTitle, hasImage: !!imageUrl },
"OG scrape succeeded"
)

return {
type: "link",
url,
Expand All @@ -102,7 +174,8 @@ async function fetchOgEmbed(url: string): Promise<Embed | null> {
thumbnail: imageUrl,
siteName: proxy?.siteName ?? result.ogSiteName ?? undefined,
}
} catch {
} catch (err) {
logger.error({ err, url, resolvedUrl }, "OG scrape threw")
return null
}
}
Expand All @@ -112,19 +185,36 @@ export function createLinkUnfurlProcessor(
) {
return async (job: Job<LinkUnfurlJobData>) => {
const { messageId, channelId, urls } = job.data
logger.info(
{ jobId: job.id, messageId, urlCount: urls.length },
"Processing link-unfurl job"
)

if (urls.length === 0) return

const results = await Promise.all(urls.map(fetchOgEmbed))
const embeds = results.filter((e): e is Embed => e !== null)
if (embeds.length === 0) return

if (embeds.length === 0) {
logger.info(
{ jobId: job.id, messageId },
"No embeds produced, clearing stored embeds"
)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

const [updated] = await db
.update(schema.message)
.set({ embeds })
.where(eq(schema.message.id, messageId))
.returning({ id: schema.message.id })

if (!updated) return
if (!updated) {
logger.warn(
{ jobId: job.id, messageId },
"Message not found for embed update"
)
return
}

const payload: RealtimeMessageEmbedsUpdated = {
channelId,
Expand All @@ -133,5 +223,9 @@ export function createLinkUnfurlProcessor(
}

emitter.to(channelRoom(channelId)).emit("message:embeds:updated", payload)
logger.info(
{ jobId: job.id, messageId, embedCount: embeds.length },
"Embeds updated and emitted"
)
}
}
Loading