Skip to content

Commit

Permalink
feat: configurable block providers
Browse files Browse the repository at this point in the history
Adds configurable block providers to allow using bitswap but also other
methods such as trustless gateways and any yet-to-be-invented way of
resolving a CID to a block..
  • Loading branch information
achingbrain committed Oct 9, 2023
1 parent 51316ba commit 8ee0646
Show file tree
Hide file tree
Showing 12 changed files with 363 additions and 72 deletions.
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
"docs": "NODE_OPTIONS=--max_old_space_size=8192 aegir docs -- --exclude packages/interop --excludeExternals",
"docs:no-publish": "NODE_OPTIONS=--max_old_space_size=8192 aegir docs --publish false -- --exclude packages/interop"
},
"dependencies": {
"any-signal": "^4.1.1"
},
"devDependencies": {
"aegir": "^41.0.0",
"npm-run-all": "^4.1.5",
Expand Down
20 changes: 20 additions & 0 deletions packages/helia/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@
],
"type": "module",
"types": "./dist/src/index.d.ts",
"typesVersions": {
"*": {
"*": [
"*",
"dist/*",
"dist/src/*",
"dist/src/*/index"
],
"src/*": [
"*",
"dist/*",
"dist/src/*",
"dist/src/*/index"
]
}
},
"files": [
"src",
"dist",
Expand All @@ -26,6 +42,10 @@
".": {
"types": "./dist/src/index.d.ts",
"import": "./dist/src/index.js"
},
"./block-providers": {
"types": "./dist/src/block-providers/index.d.ts",
"import": "./dist/src/block-providers/index.js"
}
},
"eslintConfig": {
Expand Down
59 changes: 59 additions & 0 deletions packages/helia/src/block-providers/bitswap-block-provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { createBitswap } from 'ipfs-bitswap'
import type { BlockProvider } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Startable } from '@libp2p/interface/startable'
import type { Blockstore } from 'interface-blockstore'
import type { AbortOptions } from 'interface-store'
import type { Bitswap, BitswapNotifyProgressEvents, BitswapWantBlockProgressEvents } from 'ipfs-bitswap'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'
import type { ProgressOptions } from 'progress-events'

export class BitswapBlockProvider implements BlockProvider<
ProgressOptions<BitswapNotifyProgressEvents>,
ProgressOptions<BitswapWantBlockProgressEvents>
>, Startable {
private readonly bitswap: Bitswap
private started: boolean

constructor (libp2p: Libp2p, blockstore: Blockstore, hashers: MultihashHasher[]) {
this.bitswap = createBitswap(libp2p, blockstore, {
hashLoader: {
getHasher: async (codecOrName: string | number): Promise<MultihashHasher<number>> => {
const hasher = hashers.find(hasher => {
return hasher.code === codecOrName || hasher.name === codecOrName
})

if (hasher != null) {
return hasher
}

throw new Error(`Could not load hasher for code/name "${codecOrName}"`)
}
}
})
this.started = false
}

isStarted (): boolean {
return this.started
}

async start (): Promise<void> {
await this.bitswap.start()
this.started = true
}

async stop (): Promise<void> {
await this.bitswap.stop()
this.started = false
}

notify (cid: CID, block: Uint8Array, options?: ProgressOptions<BitswapNotifyProgressEvents>): void {
this.bitswap.notify(cid, block, options)
}

async get (cid: CID, options?: AbortOptions & ProgressOptions<BitswapWantBlockProgressEvents>): Promise<Uint8Array> {
return this.bitswap.want(cid, options)
}
}
2 changes: 2 additions & 0 deletions packages/helia/src/block-providers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { BitswapBlockProvider } from './bitswap-block-provider.js'
export { TrustedGatewayBlockProvider } from './trustless-gateway-block-provider.js'
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import { logger } from '@libp2p/logger'
import type { BlockProvider } from '@helia/interface/blocks'
import type { AbortOptions } from 'interface-store'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

const log = logger('helia:trustless-gateway-block-provider')

export type TrustlessGatewayGetBlockProgressEvents =
ProgressEvent<'trustless-gateway:get-block:fetch', URL>

/**
* A BlockProvider that accepts a list of trustless gateways that are queried
* for blocks. Individual gateways are randomly chosen.
*/
export class TrustedGatewayBlockProvider implements BlockProvider<
ProgressOptions,
ProgressOptions<TrustlessGatewayGetBlockProgressEvents>
> {
private readonly gateways: URL[]

constructor (urls: string[]) {
this.gateways = urls.map(url => new URL(url.toString()))
}

notify (cid: CID, block: Uint8Array, options?: ProgressOptions): void {
/* no-op */
}

async get (cid: CID, options: AbortOptions & ProgressOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
// choose a gateway
const url = this.gateways[Math.floor(Math.random() * this.gateways.length)]

log('getting block for %c from %s', cid, url)

try {
const block = await getRawBlockFromGateway(url, cid, options.signal)
log('got block for %c from %s', cid, url)

return block
} catch (err: any) {
log.error('failed to get block for %c from %s', cid, url, err)

throw err
}
}
}

async function getRawBlockFromGateway (url: URL, cid: CID, signal?: AbortSignal): Promise<Uint8Array> {
const gwUrl = new URL(url)
gwUrl.pathname = `/ipfs/${cid.toString()}`

// necessary as not every gateway supports dag-cbor, but every should support
// sending raw block as-is
gwUrl.search = '?format=raw'

if (signal?.aborted === true) {
throw new Error(`Signal to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted prior to fetch`)
}

try {
const res = await fetch(gwUrl.toString(), {
signal,
headers: {
// also set header, just in case ?format= is filtered out by some
// reverse proxy
Accept: 'application/vnd.ipld.raw'
},
cache: 'force-cache'
})
if (!res.ok) {
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${gwUrl.toString()}`)
}
return new Uint8Array(await res.arrayBuffer())
} catch (cause) {
// @ts-expect-error - TS thinks signal?.aborted can only be false now
// because it was checked for true above.
if (signal?.aborted === true) {
throw new Error(`fetching raw block for CID ${cid} from gateway ${gwUrl.toString()} was aborted`)
}
throw new Error(`unable to fetch raw block for CID ${cid}`)
}
}
38 changes: 5 additions & 33 deletions packages/helia/src/helia.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { start, stop } from '@libp2p/interface/startable'
import { logger } from '@libp2p/logger'
import { type Bitswap, createBitswap } from 'ipfs-bitswap'
import drain from 'it-drain'
import { identity } from 'multiformats/hashes/identity'
import { sha256, sha512 } from 'multiformats/hashes/sha2'
import { CustomProgressEvent } from 'progress-events'
import { PinsImpl } from './pins.js'
import { BlockStorage } from './storage.js'
Expand All @@ -15,7 +13,6 @@ import type { Libp2p } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { Datastore } from 'interface-datastore'
import type { CID } from 'multiformats/cid'
import type { MultihashHasher } from 'multiformats/hashes/interface'

const log = logger('helia')

Expand All @@ -31,34 +28,10 @@ export class HeliaImpl implements Helia {
public datastore: Datastore
public pins: Pins

#bitswap?: Bitswap

constructor (init: HeliaImplInit) {
const hashers: MultihashHasher[] = [
sha256,
sha512,
identity,
...(init.hashers ?? [])
]

this.#bitswap = createBitswap(init.libp2p, init.blockstore, {
hashLoader: {
getHasher: async (codecOrName: string | number): Promise<MultihashHasher<number>> => {
const hasher = hashers.find(hasher => {
return hasher.code === codecOrName || hasher.name === codecOrName
})

if (hasher != null) {
return hasher
}

throw new Error(`Could not load hasher for code/name "${codecOrName}"`)
}
}
})

const networkedStorage = new NetworkedStorage(init.blockstore, {
bitswap: this.#bitswap
blockProviders: init.blockProviders,
hashers: init.hashers
})

this.pins = new PinsImpl(init.datastore, networkedStorage, init.dagWalkers ?? [])
Expand All @@ -72,14 +45,13 @@ export class HeliaImpl implements Helia {

async start (): Promise<void> {
await assertDatastoreVersionIsCurrent(this.datastore)

await this.#bitswap?.start()
await start(this.blockstore)
await this.libp2p.start()
}

async stop (): Promise<void> {
await this.libp2p.stop()
await this.#bitswap?.stop()
await stop(this.blockstore)
}

async gc (options: GCOptions = {}): Promise<void> {
Expand Down
43 changes: 42 additions & 1 deletion packages/helia/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@
import { logger } from '@libp2p/logger'
import { MemoryBlockstore } from 'blockstore-core'
import { MemoryDatastore } from 'datastore-core'
import { identity } from 'multiformats/hashes/identity'
import { sha256, sha512 } from 'multiformats/hashes/sha2'
import { BitswapBlockProvider } from './block-providers/bitswap-block-provider.js'
import { TrustedGatewayBlockProvider } from './block-providers/trustless-gateway-block-provider.js'
import { HeliaImpl } from './helia.js'
import { createLibp2p } from './utils/libp2p.js'
import { name, version } from './version.js'
import type { DefaultLibp2pServices } from './utils/libp2p-defaults.js'
import type { Helia } from '@helia/interface'
import type { BlockProvider } from '@helia/interface/blocks'
import type { Libp2p } from '@libp2p/interface'
import type { Blockstore } from 'interface-blockstore'
import type { Datastore } from 'interface-datastore'
Expand Down Expand Up @@ -91,6 +96,12 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
*/
dagWalkers?: DAGWalker[]

/**
* A list of strategies used to fetch blocks when they are not present in
* the local blockstore
*/
blockProviders?: BlockProvider[]

/**
* Pass `false` to not start the Helia node
*/
Expand All @@ -114,6 +125,23 @@ export interface HeliaInit<T extends Libp2p = Libp2p> {
holdGcLock?: boolean
}

const DEFAULT_TRUSTLESS_GATEWAYS = [
// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://dweb.link',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cf-ipfs.com',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://4everland.io',

// 2023-10-03: IPNS, Origin, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://w3s.link',

// 2023-10-03: IPNS, and Block/CAR support from https://ipfs-public-gateway-checker.on.fleek.co/
'https://cloudflare-ipfs.com'
]

/**
* Create and return a Helia node
*/
Expand All @@ -131,11 +159,24 @@ export async function createHelia (init: HeliaInit = {}): Promise<Helia<unknown>
libp2p = await createLibp2p(datastore, init.libp2p)
}

const hashers: MultihashHasher[] = [
sha256,
sha512,
identity,
...(init.hashers ?? [])
]

const blockProviders = init.blockProviders ?? [
new BitswapBlockProvider(libp2p, blockstore, hashers),
new TrustedGatewayBlockProvider(DEFAULT_TRUSTLESS_GATEWAYS)
]

const helia = new HeliaImpl({
...init,
datastore,
blockstore,
libp2p
libp2p,
blockProviders
})

if (init.start !== false) {
Expand Down
19 changes: 18 additions & 1 deletion packages/helia/src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { start, stop, type Startable } from '@libp2p/interface/startable'
import createMortice from 'mortice'
import type { Blocks, Pair, DeleteManyBlocksProgressEvents, DeleteBlockProgressEvents, GetBlockProgressEvents, GetManyBlocksProgressEvents, PutManyBlocksProgressEvents, PutBlockProgressEvents, GetAllBlocksProgressEvents, GetOfflineOptions } from '@helia/interface/blocks'
import type { Pins } from '@helia/interface/pins'
Expand All @@ -21,10 +22,11 @@ export interface GetOptions extends AbortOptions {
* blockstore (that may be on disk, s3, or something else). If the blocks are
* not present Bitswap will be used to fetch them from network peers.
*/
export class BlockStorage implements Blocks {
export class BlockStorage implements Blocks, Startable {
public lock: Mortice
private readonly child: Blockstore
private readonly pins: Pins
private started: boolean

/**
* Create a new BlockStorage
Expand All @@ -35,6 +37,21 @@ export class BlockStorage implements Blocks {
this.lock = createMortice({
singleProcess: options.holdGcLock
})
this.started = false
}

isStarted (): boolean {
return this.started
}

async start (): Promise<void> {
await start(this.child)
this.started = true
}

async stop (): Promise<void> {
await stop(this.child)
this.started = false
}

unwrap (): Blockstore {
Expand Down
Loading

0 comments on commit 8ee0646

Please sign in to comment.