From ea9e92bf28f7a94a076f181300c61a0089346da2 Mon Sep 17 00:00:00 2001 From: redsolver Date: Sun, 2 Jul 2023 18:08:26 +0200 Subject: [PATCH] Add IPFS Object Store --- lib/store/create.dart | 10 ++ lib/store/ipfs.dart | 237 +++++++++++++++++++++++++++++++ lib/util/expect_status_code.dart | 9 ++ 3 files changed, 256 insertions(+) create mode 100644 lib/store/ipfs.dart create mode 100644 lib/util/expect_status_code.dart diff --git a/lib/store/create.dart b/lib/store/create.dart index 764f28d..d718959 100644 --- a/lib/store/create.dart +++ b/lib/store/create.dart @@ -6,6 +6,7 @@ import 'package:minio/minio.dart'; import 'package:s5_server/node.dart'; import 'base.dart'; +import 'ipfs.dart'; import 'local.dart'; import 'pixeldrain.dart'; import 's3.dart'; @@ -20,6 +21,7 @@ Map createStoresFromConfig( final localConfig = config['store']?['local']; final siaConfig = config['store']?['sia']; final pixeldrainConfig = config['store']?['pixeldrain']; + final ipfsConfig = config['store']?['ipfs']; final arweaveConfig = config['store']?['arweave']; final estuaryConfig = config['store']?['estuary']; @@ -71,6 +73,14 @@ Map createStoresFromConfig( ); } + if (ipfsConfig != null) { + stores['ipfs'] = IPFSObjectStore( + ipfsConfig['gatewayUrl'], + ipfsConfig['apiUrl'], + ipfsConfig['apiAuthorizationHeader'], + ); + } + /* if (metadataBridgeConfig != null) { stores['bridge'] = MetadataBridgeObjectStore( crypto: crypto, diff --git a/lib/store/ipfs.dart b/lib/store/ipfs.dart new file mode 100644 index 0000000..ed844a3 --- /dev/null +++ b/lib/store/ipfs.dart @@ -0,0 +1,237 @@ +import 'dart:convert'; +import 'dart:typed_data'; + +import 'package:http/http.dart' as http; +import 'package:lib5/constants.dart'; +import 'package:lib5/lib5.dart'; + +import 'package:s5_server/util/expect_status_code.dart'; +import 'base.dart'; + +class IPFSObjectStore extends ObjectStore { + final String gatewayUrl; + final String apiUrl; + final String? authorizationHeader; + + final httpClient = http.Client(); + + IPFSObjectStore(this.gatewayUrl, this.apiUrl, this.authorizationHeader); + + var availableHashes = {}; + var availableBaoOutboardHashes = {}; + + late final Map authHeaders; + + @override + Future init() async { + authHeaders = authorizationHeader == null + ? {} + : {'Authorization': authorizationHeader!}; + + final mkdirRes = await httpClient.post( + _getApiUri('/files/mkdir?arg=/s5/blob&parents=true&hash=blake3'), + headers: authHeaders, + ); + mkdirRes.expectStatusCode(200); + + final mkdirOutboardRes = await httpClient.post( + _getApiUri('/files/mkdir?arg=/s5/obao&parents=true&hash=blake3'), + headers: authHeaders, + ); + mkdirOutboardRes.expectStatusCode(200); + + final blobListRes = await httpClient.post( + _getApiUri('/files/ls?arg=/s5/blob&long=true'), + headers: authHeaders, + ); + for (final entry in (json.decode(blobListRes.body)['Entries'] ?? [])) { + final String name = entry['Name']; + availableHashes[Multihash.fromBase64Url(name)] = entry['Hash']; + } + + final outboardListRes = await httpClient.post( + _getApiUri('/files/ls?arg=/s5/obao&long=true'), + headers: authHeaders, + ); + for (final entry in (json.decode(outboardListRes.body)['Entries'] ?? [])) { + final String name = entry['Name']; + availableBaoOutboardHashes[Multihash.fromBase64Url(name)] = entry['Hash']; + } + } + + @override + final uploadsSupported = true; + + Uri _getApiUri(String path) { + return Uri.parse('$apiUrl/api/v0$path'); + } + + String getObjectPathForHash(Multihash hash, [String? ext]) { + if (ext != null) { + return '/s5/obao/${hash.toBase64Url()}'; + } + return '/s5/blob/${hash.toBase64Url()}'; + } + + @override + Future canProvide(Multihash hash, List types) async { + for (final type in types) { + if (type == storageLocationTypeArchive) { + if (availableHashes.containsKey(hash)) { + return true; + } + } else if (type == storageLocationTypeFile) { + if (availableHashes.containsKey(hash)) { + return true; + } + } else if (type == storageLocationTypeFull) { + if (availableHashes.containsKey(hash) && + availableBaoOutboardHashes.containsKey(hash)) { + return true; + } + } + } + return false; + } + + @override + Future provide(Multihash hash, List types) async { + for (final type in types) { + if (!(await canProvide(hash, [type]))) continue; + + final fileUrl = '$gatewayUrl/ipfs/${availableHashes[hash]!}'; + if (type == storageLocationTypeArchive) { + return StorageLocation( + storageLocationTypeArchive, + [], + calculateExpiry(Duration(days: 1)), + ); + } else if (type == storageLocationTypeFile) { + return StorageLocation( + storageLocationTypeFile, + [fileUrl], + calculateExpiry(Duration(hours: 1)), + ); + } else if (type == storageLocationTypeFull) { + final outboardUrl = + '$gatewayUrl/ipfs/${availableBaoOutboardHashes[hash]!}'; + return StorageLocation( + storageLocationTypeFull, + [fileUrl, outboardUrl], + calculateExpiry(Duration(hours: 1)), + ); + } + } + throw 'Could not provide hash $hash for types $types'; + } + + @override + Future contains(Multihash hash) async { + return availableHashes.containsKey(hash); + } + + @override + Future put( + Multihash hash, + Stream data, + int length, + ) async { + if (await contains(hash)) { + return; + } + + final uploadUrl = _getApiUri( + '/add?quieter=true&chunker=size-1048576&raw-leaves=true&hash=blake3&pin=true', + ); + + final request = http.MultipartRequest('POST', uploadUrl); + request.files.add(http.MultipartFile( + 'file', + data, + length, + )); + request.headers.addAll(authHeaders); + + final res = await request.send(); + final body = await res.stream.bytesToString(); + + if (res.statusCode != 200) { + throw Exception('IPFS upload failed: HTTP ${res.statusCode}: $body'); + } + final String cid = jsonDecode(body)['Hash']; + + final copyRes = await httpClient.post( + _getApiUri('/files/cp?arg=/ipfs/$cid&arg=${getObjectPathForHash(hash)}'), + headers: authHeaders, + ); + copyRes.expectStatusCode(200); + availableHashes[hash] = cid; + } + + @override + Future putBaoOutboardBytes(Multihash hash, Uint8List outboard) async { + if (availableBaoOutboardHashes.containsKey(hash)) { + return; + } + + final uploadUrl = _getApiUri( + '/add?quieter=true&chunker=size-1048576&raw-leaves=true&hash=blake3&pin=true', + ); + + final request = http.MultipartRequest('POST', uploadUrl); + request.files.add(http.MultipartFile.fromBytes( + 'file', + outboard, + )); + request.headers.addAll(authHeaders); + + final res = await request.send(); + final body = await res.stream.bytesToString(); + + if (res.statusCode != 200) { + throw Exception('IPFS upload failed: HTTP ${res.statusCode}: $body'); + } + final String cid = jsonDecode(body)['Hash']; + + final copyRes = await httpClient.post( + _getApiUri( + '/files/cp?arg=/ipfs/$cid&arg=${getObjectPathForHash(hash, 'obao')}'), + headers: authHeaders, + ); + copyRes.expectStatusCode(200); + availableBaoOutboardHashes[hash] = cid; + } + + @override + Future delete(Multihash hash) async { + if (availableBaoOutboardHashes.containsKey(hash)) { + final unpinRes = await httpClient.post( + _getApiUri('/pin/rm?arg=${availableBaoOutboardHashes[hash]!}'), + headers: authHeaders, + ); + unpinRes.expectStatusCode(200); + + final res = await httpClient.post( + _getApiUri('/files/rm?arg=${getObjectPathForHash(hash, 'obao')}'), + headers: authHeaders, + ); + res.expectStatusCode(200); + availableBaoOutboardHashes.remove(hash); + } + + if (availableHashes.containsKey(hash)) { + final unpinRes = await httpClient.post( + _getApiUri('/pin/rm?arg=${availableHashes[hash]!}'), + headers: authHeaders, + ); + unpinRes.expectStatusCode(200); + + final res = await httpClient.post( + _getApiUri('/files/rm?arg=${getObjectPathForHash(hash)}'), + headers: authHeaders, + ); + res.expectStatusCode(200); + availableHashes.remove(hash); + } + } +} \ No newline at end of file diff --git a/lib/util/expect_status_code.dart b/lib/util/expect_status_code.dart new file mode 100644 index 0000000..eb69d9a --- /dev/null +++ b/lib/util/expect_status_code.dart @@ -0,0 +1,9 @@ +import 'package:http/http.dart'; + +extension ExpectStatusCode on Response { + void expectStatusCode(int code) { + if (statusCode != code) { + throw 'HTTP $statusCode: $body (expected $code)'; + } + } +} \ No newline at end of file